Mercurial > libervia-backend
comparison libervia/backend/plugins/plugin_xep_0373.py @ 4071:4b842c1fb686
refactoring: renamed `sat` package to `libervia.backend`
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 02 Jun 2023 11:49:51 +0200 |
parents | sat/plugins/plugin_xep_0373.py@524856bd7b19 |
children | 040095a5dc7f |
comparison
equal
deleted
inserted
replaced
4070:d10748475025 | 4071:4b842c1fb686 |
---|---|
1 #!/usr/bin/env python3 | |
2 | |
3 # Libervia plugin for OpenPGP for XMPP | |
4 # Copyright (C) 2022-2022 Tim Henkes (me@syndace.dev) | |
5 | |
6 # This program is free software: you can redistribute it and/or modify | |
7 # it under the terms of the GNU Affero General Public License as published by | |
8 # the Free Software Foundation, either version 3 of the License, or | |
9 # (at your option) any later version. | |
10 | |
11 # This program is distributed in the hope that it will be useful, | |
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
14 # GNU Affero General Public License for more details. | |
15 | |
16 # You should have received a copy of the GNU Affero General Public License | |
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
18 | |
19 from abc import ABC, abstractmethod | |
20 import base64 | |
21 from datetime import datetime, timezone | |
22 import enum | |
23 import secrets | |
24 import string | |
25 from typing import Any, Dict, Iterable, List, Literal, Optional, Set, Tuple, cast | |
26 from xml.sax.saxutils import quoteattr | |
27 | |
28 from typing_extensions import Final, NamedTuple, Never, assert_never | |
29 from wokkel import muc, pubsub | |
30 from wokkel.disco import DiscoFeature, DiscoInfo | |
31 import xmlschema | |
32 | |
33 from libervia.backend.core import exceptions | |
34 from libervia.backend.core.constants import Const as C | |
35 from libervia.backend.core.core_types import SatXMPPEntity | |
36 from libervia.backend.core.i18n import _, D_ | |
37 from libervia.backend.core.log import getLogger, Logger | |
38 from libervia.backend.core.sat_main import SAT | |
39 from libervia.backend.core.xmpp import SatXMPPClient | |
40 from libervia.backend.memory import persistent | |
41 from libervia.backend.plugins.plugin_xep_0045 import XEP_0045 | |
42 from libervia.backend.plugins.plugin_xep_0060 import XEP_0060 | |
43 from libervia.backend.plugins.plugin_xep_0163 import XEP_0163 | |
44 from libervia.backend.tools.xmpp_datetime import format_datetime, parse_datetime | |
45 from libervia.backend.tools import xml_tools | |
46 from twisted.internet import defer | |
47 from twisted.words.protocols.jabber import jid | |
48 from twisted.words.xish import domish | |
49 | |
50 try: | |
51 import gpg | |
52 except ImportError as import_error: | |
53 raise exceptions.MissingModule( | |
54 "You are missing the 'gpg' package required by the OX plugin. The recommended" | |
55 " installation method is via your operating system's package manager, since the" | |
56 " version of the library has to match the version of your GnuPG installation. See" | |
57 " https://wiki.python.org/moin/GnuPrivacyGuard#Accessing_GnuPG_via_gpgme" | |
58 ) from import_error | |
59 | |
60 | |
61 __all__ = [ # pylint: disable=unused-variable | |
62 "PLUGIN_INFO", | |
63 "NS_OX", | |
64 "XEP_0373", | |
65 "VerificationError", | |
66 "XMPPInteractionFailed", | |
67 "InvalidPacket", | |
68 "DecryptionFailed", | |
69 "VerificationFailed", | |
70 "UnknownKey", | |
71 "GPGProviderError", | |
72 "GPGPublicKey", | |
73 "GPGSecretKey", | |
74 "GPGProvider", | |
75 "PublicKeyMetadata", | |
76 "gpg_provider", | |
77 "TrustLevel" | |
78 ] | |
79 | |
80 | |
81 log = cast(Logger, getLogger(__name__)) # type: ignore[no-untyped-call] | |
82 | |
83 | |
84 PLUGIN_INFO = { | |
85 C.PI_NAME: "XEP-0373", | |
86 C.PI_IMPORT_NAME: "XEP-0373", | |
87 C.PI_TYPE: "SEC", | |
88 C.PI_PROTOCOLS: [ "XEP-0373" ], | |
89 C.PI_DEPENDENCIES: [ "XEP-0060", "XEP-0163" ], | |
90 C.PI_RECOMMENDATIONS: [], | |
91 C.PI_MAIN: "XEP_0373", | |
92 C.PI_HANDLER: "no", | |
93 C.PI_DESCRIPTION: D_("Implementation of OpenPGP for XMPP"), | |
94 } | |
95 | |
96 | |
97 NS_OX: Final = "urn:xmpp:openpgp:0" | |
98 | |
99 | |
100 PARAM_CATEGORY = "Security" | |
101 PARAM_NAME = "ox_policy" | |
102 STR_KEY_PUBLIC_KEYS_METADATA = "/public-keys-metadata/{}" | |
103 | |
104 | |
105 class VerificationError(Exception): | |
106 """ | |
107 Raised by verifying methods of :class:`XEP_0373` on semantical verification errors. | |
108 """ | |
109 | |
110 | |
111 class XMPPInteractionFailed(Exception): | |
112 """ | |
113 Raised by methods of :class:`XEP_0373` on XMPP interaction failure. The reason this | |
114 exception exists is that the exceptions raised by XMPP interactions are not properly | |
115 documented for the most part, thus all exceptions are caught and wrapped in instances | |
116 of this class. | |
117 """ | |
118 | |
119 | |
120 class InvalidPacket(ValueError): | |
121 """ | |
122 Raised by methods of :class:`GPGProvider` when an invalid packet is encountered. | |
123 """ | |
124 | |
125 | |
126 class DecryptionFailed(Exception): | |
127 """ | |
128 Raised by methods of :class:`GPGProvider` on decryption failures. | |
129 """ | |
130 | |
131 | |
132 class VerificationFailed(Exception): | |
133 """ | |
134 Raised by methods of :class:`GPGProvider` on verification failures. | |
135 """ | |
136 | |
137 | |
138 class UnknownKey(ValueError): | |
139 """ | |
140 Raised by methods of :class:`GPGProvider` when an unknown key is referenced. | |
141 """ | |
142 | |
143 | |
144 class GPGProviderError(Exception): | |
145 """ | |
146 Raised by methods of :class:`GPGProvider` on internal errors. | |
147 """ | |
148 | |
149 | |
150 class GPGPublicKey(ABC): | |
151 """ | |
152 Interface describing a GPG public key. | |
153 """ | |
154 | |
155 @property | |
156 @abstractmethod | |
157 def fingerprint(self) -> str: | |
158 """ | |
159 @return: The OpenPGP v4 fingerprint string of this public key. | |
160 """ | |
161 | |
162 | |
163 class GPGSecretKey(ABC): | |
164 """ | |
165 Interface descibing a GPG secret key. | |
166 """ | |
167 | |
168 @property | |
169 @abstractmethod | |
170 def public_key(self) -> GPGPublicKey: | |
171 """ | |
172 @return: The public key corresponding to this secret key. | |
173 """ | |
174 | |
175 | |
176 class GPGProvider(ABC): | |
177 """ | |
178 Interface describing a GPG provider, i.e. a library or framework providing GPG | |
179 encryption, signing and key management. | |
180 | |
181 All methods may raise :class:`GPGProviderError` in addition to those exception types | |
182 listed explicitly. | |
183 | |
184 # TODO: Check keys for revoked, disabled and expired everywhere and exclude those (?) | |
185 """ | |
186 | |
187 @abstractmethod | |
188 def export_public_key(self, public_key: GPGPublicKey) -> bytes: | |
189 """Export a public key in a key material packet according to RFC 4880 §5.5. | |
190 | |
191 Do not use OpenPGP's ASCII Armor. | |
192 | |
193 @param public_key: The public key to export. | |
194 @return: The packet containing the exported public key. | |
195 @raise UnknownKey: if the public key is not available. | |
196 """ | |
197 | |
198 @abstractmethod | |
199 def import_public_key(self, packet: bytes) -> GPGPublicKey: | |
200 """import a public key from a key material packet according to RFC 4880 §5.5. | |
201 | |
202 OpenPGP's ASCII Armor is not used. | |
203 | |
204 @param packet: A packet containing an exported public key. | |
205 @return: The public key imported from the packet. | |
206 @raise InvalidPacket: if the packet is either syntactically or semantically deemed | |
207 invalid. | |
208 | |
209 @warning: Only packets of version 4 or higher may be accepted, packets below | |
210 version 4 MUST be rejected. | |
211 """ | |
212 | |
213 @abstractmethod | |
214 def backup_secret_key(self, secret_key: GPGSecretKey) -> bytes: | |
215 """Export a secret key for transfer according to RFC 4880 §11.1. | |
216 | |
217 Do not encrypt the secret data, i.e. set the octet indicating string-to-key usage | |
218 conventions to zero in the corresponding secret-key packet according to RFC 4880 | |
219 §5.5.3. Do not use OpenPGP's ASCII Armor. | |
220 | |
221 @param secret_key: The secret key to export. | |
222 @return: The binary blob containing the exported secret key. | |
223 @raise UnknownKey: if the secret key is not available. | |
224 """ | |
225 | |
226 @abstractmethod | |
227 def restore_secret_keys(self, data: bytes) -> Set[GPGSecretKey]: | |
228 """Restore secret keys exported for transfer according to RFC 4880 §11.1. | |
229 | |
230 The secret data is not encrypted, i.e. the octet indicating string-to-key usage | |
231 conventions in the corresponding secret-key packets according to RFC 4880 §5.5.3 | |
232 are set to zero. OpenPGP's ASCII Armor is not used. | |
233 | |
234 @param data: Concatenation of one or more secret keys exported for transfer. | |
235 @return: The secret keys imported from the data. | |
236 @raise InvalidPacket: if the data or one of the packets included in the data is | |
237 either syntactically or semantically deemed invalid. | |
238 | |
239 @warning: Only packets of version 4 or higher may be accepted, packets below | |
240 version 4 MUST be rejected. | |
241 """ | |
242 | |
243 @abstractmethod | |
244 def encrypt_symmetrically(self, plaintext: bytes, password: str) -> bytes: | |
245 """Encrypt data symmetrically according to RFC 4880 §5.3. | |
246 | |
247 The password is used to build a Symmetric-Key Encrypted Session Key packet which | |
248 precedes the Symmetrically Encrypted Data packet that holds the encrypted data. | |
249 | |
250 @param plaintext: The data to encrypt. | |
251 @param password: The password to encrypt the data with. | |
252 @return: The encrypted data. | |
253 """ | |
254 | |
255 @abstractmethod | |
256 def decrypt_symmetrically(self, ciphertext: bytes, password: str) -> bytes: | |
257 """Decrypt data symmetrically according to RFC 4880 §5.3. | |
258 | |
259 The ciphertext consists of a Symmetrically Encrypted Data packet that holds the | |
260 encrypted data, preceded by a Symmetric-Key Encrypted Session Key packet using the | |
261 password. | |
262 | |
263 @param ciphertext: The ciphertext. | |
264 @param password: The password to decrypt the data with. | |
265 @return: The plaintext. | |
266 @raise DecryptionFailed: on decryption failure. | |
267 """ | |
268 | |
269 @abstractmethod | |
270 def sign(self, data: bytes, secret_keys: Set[GPGSecretKey]) -> bytes: | |
271 """Sign some data. | |
272 | |
273 OpenPGP's ASCII Armor is not used. | |
274 | |
275 @param data: The data to sign. | |
276 @param secret_keys: The secret keys to sign the data with. | |
277 @return: The OpenPGP message carrying the signed data. | |
278 """ | |
279 | |
280 @abstractmethod | |
281 def sign_detached(self, data: bytes, secret_keys: Set[GPGSecretKey]) -> bytes: | |
282 """Sign some data. Create the signature detached from the data. | |
283 | |
284 OpenPGP's ASCII Armor is not used. | |
285 | |
286 @param data: The data to sign. | |
287 @param secret_keys: The secret keys to sign the data with. | |
288 @return: The OpenPGP message carrying the detached signature. | |
289 """ | |
290 | |
291 @abstractmethod | |
292 def verify(self, signed_data: bytes, public_keys: Set[GPGPublicKey]) -> bytes: | |
293 """Verify signed data. | |
294 | |
295 OpenPGP's ASCII Armor is not used. | |
296 | |
297 @param signed_data: The signed data as an OpenPGP message. | |
298 @param public_keys: The public keys to verify the signature with. | |
299 @return: The verified and unpacked data. | |
300 @raise VerificationFailed: if the data could not be verified. | |
301 | |
302 @warning: For implementors: it has to be confirmed that a valid signature by one | |
303 of the public keys is available. | |
304 """ | |
305 | |
306 @abstractmethod | |
307 def verify_detached( | |
308 self, | |
309 data: bytes, | |
310 signature: bytes, | |
311 public_keys: Set[GPGPublicKey] | |
312 ) -> None: | |
313 """Verify signed data, where the signature was created detached from the data. | |
314 | |
315 OpenPGP's ASCII Armor is not used. | |
316 | |
317 @param data: The data. | |
318 @param signature: The signature as an OpenPGP message. | |
319 @param public_keys: The public keys to verify the signature with. | |
320 @raise VerificationFailed: if the data could not be verified. | |
321 | |
322 @warning: For implementors: it has to be confirmed that a valid signature by one | |
323 of the public keys is available. | |
324 """ | |
325 | |
326 @abstractmethod | |
327 def encrypt( | |
328 self, | |
329 plaintext: bytes, | |
330 public_keys: Set[GPGPublicKey], | |
331 signing_keys: Optional[Set[GPGSecretKey]] = None | |
332 ) -> bytes: | |
333 """Encrypt and optionally sign some data. | |
334 | |
335 OpenPGP's ASCII Armor is not used. | |
336 | |
337 @param plaintext: The data to encrypt and optionally sign. | |
338 @param public_keys: The public keys to encrypt the data for. | |
339 @param signing_keys: The secret keys to sign the data with. | |
340 @return: The OpenPGP message carrying the encrypted and optionally signed data. | |
341 """ | |
342 | |
343 @abstractmethod | |
344 def decrypt( | |
345 self, | |
346 ciphertext: bytes, | |
347 secret_keys: Set[GPGSecretKey], | |
348 public_keys: Optional[Set[GPGPublicKey]] = None | |
349 ) -> bytes: | |
350 """Decrypt and optionally verify some data. | |
351 | |
352 OpenPGP's ASCII Armor is not used. | |
353 | |
354 @param ciphertext: The encrypted and optionally signed data as an OpenPGP message. | |
355 @param secret_keys: The secret keys to attempt decryption with. | |
356 @param public_keys: The public keys to verify the optional signature with. | |
357 @return: The decrypted, optionally verified and unpacked data. | |
358 @raise DecryptionFailed: on decryption failure. | |
359 @raise VerificationFailed: if the data could not be verified. | |
360 | |
361 @warning: For implementors: it has to be confirmed that the data was decrypted | |
362 using one of the secret keys and that a valid signature by one of the public | |
363 keys is available in case the data is signed. | |
364 """ | |
365 | |
366 @abstractmethod | |
367 def list_public_keys(self, user_id: str) -> Set[GPGPublicKey]: | |
368 """List public keys. | |
369 | |
370 @param user_id: The user id. | |
371 @return: The set of public keys available for this user id. | |
372 """ | |
373 | |
374 @abstractmethod | |
375 def list_secret_keys(self, user_id: str) -> Set[GPGSecretKey]: | |
376 """List secret keys. | |
377 | |
378 @param user_id: The user id. | |
379 @return: The set of secret keys available for this user id. | |
380 """ | |
381 | |
382 @abstractmethod | |
383 def can_sign(self, public_key: GPGPublicKey) -> bool: | |
384 """ | |
385 @return: Whether the public key belongs to a key pair capable of signing. | |
386 """ | |
387 | |
388 @abstractmethod | |
389 def can_encrypt(self, public_key: GPGPublicKey) -> bool: | |
390 """ | |
391 @return: Whether the public key belongs to a key pair capable of encryption. | |
392 """ | |
393 | |
394 @abstractmethod | |
395 def create_key(self, user_id: str) -> GPGSecretKey: | |
396 """Create a new GPG key, capable of signing and encryption. | |
397 | |
398 The key is generated without password protection and without expiration. If a key | |
399 with the same user id already exists, a new key is created anyway. | |
400 | |
401 @param user_id: The user id to assign to the new key. | |
402 @return: The new key. | |
403 """ | |
404 | |
405 | |
406 class GPGME_GPGPublicKey(GPGPublicKey): | |
407 """ | |
408 GPG public key implementation based on GnuPG Made Easy (GPGME). | |
409 """ | |
410 | |
411 def __init__(self, key_obj: Any) -> None: | |
412 """ | |
413 @param key_obj: The GPGME key object. | |
414 """ | |
415 | |
416 self.__key_obj = key_obj | |
417 | |
418 @property | |
419 def fingerprint(self) -> str: | |
420 return self.__key_obj.fpr | |
421 | |
422 @property | |
423 def key_obj(self) -> Any: | |
424 return self.__key_obj | |
425 | |
426 | |
427 class GPGME_GPGSecretKey(GPGSecretKey): | |
428 """ | |
429 GPG secret key implementation based on GnuPG Made Easy (GPGME). | |
430 """ | |
431 | |
432 def __init__(self, public_key: GPGME_GPGPublicKey) -> None: | |
433 """ | |
434 @param public_key: The public key corresponding to this secret key. | |
435 """ | |
436 | |
437 self.__public_key = public_key | |
438 | |
439 @property | |
440 def public_key(self) -> GPGME_GPGPublicKey: | |
441 return self.__public_key | |
442 | |
443 | |
444 class GPGME_GPGProvider(GPGProvider): | |
445 """ | |
446 GPG provider implementation based on GnuPG Made Easy (GPGME). | |
447 """ | |
448 | |
449 def __init__(self, home_dir: Optional[str] = None) -> None: | |
450 """ | |
451 @param home_dir: Optional GPG home directory path to use for all operations. | |
452 """ | |
453 | |
454 self.__home_dir = home_dir | |
455 | |
456 def export_public_key(self, public_key: GPGPublicKey) -> bytes: | |
457 assert isinstance(public_key, GPGME_GPGPublicKey) | |
458 | |
459 pattern = public_key.fingerprint | |
460 | |
461 with gpg.Context(home_dir=self.__home_dir) as c: | |
462 try: | |
463 result = c.key_export_minimal(pattern) | |
464 except gpg.errors.GPGMEError as e: | |
465 raise GPGProviderError("Internal GPGME error") from e | |
466 | |
467 if result is None: | |
468 raise UnknownKey(f"Public key {pattern} not found.") | |
469 | |
470 return result | |
471 | |
472 def import_public_key(self, packet: bytes) -> GPGPublicKey: | |
473 # TODO | |
474 # - Reject packets older than version 4 | |
475 # - Check whether it's actually a public key (through packet inspection?) | |
476 | |
477 with gpg.Context(home_dir=self.__home_dir) as c: | |
478 try: | |
479 result = c.key_import(packet) | |
480 except gpg.errors.GPGMEError as e: | |
481 # From looking at the code, `key_import` never raises. The documentation | |
482 # says it does though, so this is included for future-proofness. | |
483 raise GPGProviderError("Internal GPGME error") from e | |
484 | |
485 if not hasattr(result, "considered"): | |
486 raise InvalidPacket( | |
487 f"Data not considered for public key import: {result}" | |
488 ) | |
489 | |
490 if len(result.imports) != 1: | |
491 raise InvalidPacket( | |
492 "Public key packet does not contain exactly one public key (not" | |
493 " counting subkeys)." | |
494 ) | |
495 | |
496 try: | |
497 key_obj = c.get_key(result.imports[0].fpr, secret=False) | |
498 except gpg.errors.GPGMEError as e: | |
499 raise GPGProviderError("Internal GPGME error") from e | |
500 except gpg.errors.KeyError as e: | |
501 raise GPGProviderError("Newly imported public key not found") from e | |
502 | |
503 return GPGME_GPGPublicKey(key_obj) | |
504 | |
505 def backup_secret_key(self, secret_key: GPGSecretKey) -> bytes: | |
506 assert isinstance(secret_key, GPGME_GPGSecretKey) | |
507 # TODO | |
508 # - Handle password protection/pinentry | |
509 # - Make sure the key is exported unencrypted | |
510 | |
511 pattern = secret_key.public_key.fingerprint | |
512 | |
513 with gpg.Context(home_dir=self.__home_dir) as c: | |
514 try: | |
515 result = c.key_export_secret(pattern) | |
516 except gpg.errors.GPGMEError as e: | |
517 raise GPGProviderError("Internal GPGME error") from e | |
518 | |
519 if result is None: | |
520 raise UnknownKey(f"Secret key {pattern} not found.") | |
521 | |
522 return result | |
523 | |
524 def restore_secret_keys(self, data: bytes) -> Set[GPGSecretKey]: | |
525 # TODO | |
526 # - Reject packets older than version 4 | |
527 # - Check whether it's actually secret keys (through packet inspection?) | |
528 | |
529 with gpg.Context(home_dir=self.__home_dir) as c: | |
530 try: | |
531 result = c.key_import(data) | |
532 except gpg.errors.GPGMEError as e: | |
533 # From looking at the code, `key_import` never raises. The documentation | |
534 # says it does though, so this is included for future-proofness. | |
535 raise GPGProviderError("Internal GPGME error") from e | |
536 | |
537 if not hasattr(result, "considered"): | |
538 raise InvalidPacket( | |
539 f"Data not considered for secret key import: {result}" | |
540 ) | |
541 | |
542 if len(result.imports) == 0: | |
543 raise InvalidPacket("Secret key packet does not contain a secret key.") | |
544 | |
545 secret_keys = set() | |
546 for import_status in result.imports: | |
547 try: | |
548 key_obj = c.get_key(import_status.fpr, secret=True) | |
549 except gpg.errors.GPGMEError as e: | |
550 raise GPGProviderError("Internal GPGME error") from e | |
551 except gpg.errors.KeyError as e: | |
552 raise GPGProviderError("Newly imported secret key not found") from e | |
553 | |
554 secret_keys.add(GPGME_GPGSecretKey(GPGME_GPGPublicKey(key_obj))) | |
555 | |
556 return secret_keys | |
557 | |
558 def encrypt_symmetrically(self, plaintext: bytes, password: str) -> bytes: | |
559 with gpg.Context(home_dir=self.__home_dir) as c: | |
560 try: | |
561 ciphertext, __, __ = c.encrypt(plaintext, passphrase=password, sign=False) | |
562 except gpg.errors.GPGMEError as e: | |
563 raise GPGProviderError("Internal GPGME error") from e | |
564 | |
565 return ciphertext | |
566 | |
567 def decrypt_symmetrically(self, ciphertext: bytes, password: str) -> bytes: | |
568 with gpg.Context(home_dir=self.__home_dir) as c: | |
569 try: | |
570 plaintext, __, __ = c.decrypt( | |
571 ciphertext, | |
572 passphrase=password, | |
573 verify=False | |
574 ) | |
575 except gpg.errors.GPGMEError as e: | |
576 # TODO: Find out what kind of error is raised if the password is wrong and | |
577 # re-raise it as DecryptionFailed instead. | |
578 raise GPGProviderError("Internal GPGME error") from e | |
579 except gpg.UnsupportedAlgorithm as e: | |
580 raise DecryptionFailed("Unsupported algorithm") from e | |
581 | |
582 return plaintext | |
583 | |
584 def sign(self, data: bytes, secret_keys: Set[GPGSecretKey]) -> bytes: | |
585 signers = [] | |
586 for secret_key in secret_keys: | |
587 assert isinstance(secret_key, GPGME_GPGSecretKey) | |
588 | |
589 signers.append(secret_key.public_key.key_obj) | |
590 | |
591 with gpg.Context(home_dir=self.__home_dir, signers=signers) as c: | |
592 try: | |
593 signed_data, __ = c.sign(data) | |
594 except gpg.errors.GPGMEError as e: | |
595 raise GPGProviderError("Internal GPGME error") from e | |
596 except gpg.errors.InvalidSigners as e: | |
597 raise GPGProviderError( | |
598 "At least one of the secret keys is invalid for signing" | |
599 ) from e | |
600 | |
601 return signed_data | |
602 | |
603 def sign_detached(self, data: bytes, secret_keys: Set[GPGSecretKey]) -> bytes: | |
604 signers = [] | |
605 for secret_key in secret_keys: | |
606 assert isinstance(secret_key, GPGME_GPGSecretKey) | |
607 | |
608 signers.append(secret_key.public_key.key_obj) | |
609 | |
610 with gpg.Context(home_dir=self.__home_dir, signers=signers) as c: | |
611 try: | |
612 signature, __ = c.sign(data, mode=gpg.constants.sig.mode.DETACH) | |
613 except gpg.errors.GPGMEError as e: | |
614 raise GPGProviderError("Internal GPGME error") from e | |
615 except gpg.errors.InvalidSigners as e: | |
616 raise GPGProviderError( | |
617 "At least one of the secret keys is invalid for signing" | |
618 ) from e | |
619 | |
620 return signature | |
621 | |
622 def verify(self, signed_data: bytes, public_keys: Set[GPGPublicKey]) -> bytes: | |
623 with gpg.Context(home_dir=self.__home_dir) as c: | |
624 try: | |
625 data, result = c.verify(signed_data) | |
626 except gpg.errors.GPGMEError as e: | |
627 raise GPGProviderError("Internal GPGME error") from e | |
628 except gpg.errors.BadSignatures as e: | |
629 raise VerificationFailed("Bad signatures on signed data") from e | |
630 | |
631 valid_signature_found = False | |
632 for public_key in public_keys: | |
633 assert isinstance(public_key, GPGME_GPGPublicKey) | |
634 | |
635 for subkey in public_key.key_obj.subkeys: | |
636 for sig in result.signatures: | |
637 if subkey.can_sign and subkey.fpr == sig.fpr: | |
638 valid_signature_found = True | |
639 | |
640 if not valid_signature_found: | |
641 raise VerificationFailed( | |
642 "Data not signed by one of the expected public keys" | |
643 ) | |
644 | |
645 return data | |
646 | |
647 def verify_detached( | |
648 self, | |
649 data: bytes, | |
650 signature: bytes, | |
651 public_keys: Set[GPGPublicKey] | |
652 ) -> None: | |
653 with gpg.Context(home_dir=self.__home_dir) as c: | |
654 try: | |
655 __, result = c.verify(data, signature=signature) | |
656 except gpg.errors.GPGMEError as e: | |
657 raise GPGProviderError("Internal GPGME error") from e | |
658 except gpg.errors.BadSignatures as e: | |
659 raise VerificationFailed("Bad signatures on signed data") from e | |
660 | |
661 valid_signature_found = False | |
662 for public_key in public_keys: | |
663 assert isinstance(public_key, GPGME_GPGPublicKey) | |
664 | |
665 for subkey in public_key.key_obj.subkeys: | |
666 for sig in result.signatures: | |
667 if subkey.can_sign and subkey.fpr == sig.fpr: | |
668 valid_signature_found = True | |
669 | |
670 if not valid_signature_found: | |
671 raise VerificationFailed( | |
672 "Data not signed by one of the expected public keys" | |
673 ) | |
674 | |
675 def encrypt( | |
676 self, | |
677 plaintext: bytes, | |
678 public_keys: Set[GPGPublicKey], | |
679 signing_keys: Optional[Set[GPGSecretKey]] = None | |
680 ) -> bytes: | |
681 recipients = [] | |
682 for public_key in public_keys: | |
683 assert isinstance(public_key, GPGME_GPGPublicKey) | |
684 | |
685 recipients.append(public_key.key_obj) | |
686 | |
687 signers = [] | |
688 if signing_keys is not None: | |
689 for secret_key in signing_keys: | |
690 assert isinstance(secret_key, GPGME_GPGSecretKey) | |
691 | |
692 signers.append(secret_key.public_key.key_obj) | |
693 | |
694 sign = signing_keys is not None | |
695 | |
696 with gpg.Context(home_dir=self.__home_dir, signers=signers) as c: | |
697 try: | |
698 ciphertext, __, __ = c.encrypt( | |
699 plaintext, | |
700 recipients=recipients, | |
701 sign=sign, | |
702 always_trust=True, | |
703 add_encrypt_to=True | |
704 ) | |
705 except gpg.errors.GPGMEError as e: | |
706 raise GPGProviderError("Internal GPGME error") from e | |
707 except gpg.errors.InvalidRecipients as e: | |
708 raise GPGProviderError( | |
709 "At least one of the public keys is invalid for encryption" | |
710 ) from e | |
711 except gpg.errors.InvalidSigners as e: | |
712 raise GPGProviderError( | |
713 "At least one of the signing keys is invalid for signing" | |
714 ) from e | |
715 | |
716 return ciphertext | |
717 | |
718 def decrypt( | |
719 self, | |
720 ciphertext: bytes, | |
721 secret_keys: Set[GPGSecretKey], | |
722 public_keys: Optional[Set[GPGPublicKey]] = None | |
723 ) -> bytes: | |
724 verify = public_keys is not None | |
725 | |
726 with gpg.Context(home_dir=self.__home_dir) as c: | |
727 try: | |
728 plaintext, result, verify_result = c.decrypt( | |
729 ciphertext, | |
730 verify=verify | |
731 ) | |
732 except gpg.errors.GPGMEError as e: | |
733 raise GPGProviderError("Internal GPGME error") from e | |
734 except gpg.UnsupportedAlgorithm as e: | |
735 raise DecryptionFailed("Unsupported algorithm") from e | |
736 | |
737 # TODO: Check whether the data was decrypted using one of the expected secret | |
738 # keys | |
739 | |
740 if public_keys is not None: | |
741 valid_signature_found = False | |
742 for public_key in public_keys: | |
743 assert isinstance(public_key, GPGME_GPGPublicKey) | |
744 | |
745 for subkey in public_key.key_obj.subkeys: | |
746 for sig in verify_result.signatures: | |
747 if subkey.can_sign and subkey.fpr == sig.fpr: | |
748 valid_signature_found = True | |
749 | |
750 if not valid_signature_found: | |
751 raise VerificationFailed( | |
752 "Data not signed by one of the expected public keys" | |
753 ) | |
754 | |
755 return plaintext | |
756 | |
757 def list_public_keys(self, user_id: str) -> Set[GPGPublicKey]: | |
758 with gpg.Context(home_dir=self.__home_dir) as c: | |
759 try: | |
760 return { | |
761 GPGME_GPGPublicKey(key) | |
762 for key | |
763 in c.keylist(pattern=user_id, secret=False) | |
764 } | |
765 except gpg.errors.GPGMEError as e: | |
766 raise GPGProviderError("Internal GPGME error") from e | |
767 | |
768 def list_secret_keys(self, user_id: str) -> Set[GPGSecretKey]: | |
769 with gpg.Context(home_dir=self.__home_dir) as c: | |
770 try: | |
771 return { | |
772 GPGME_GPGSecretKey(GPGME_GPGPublicKey(key)) | |
773 for key | |
774 in c.keylist(pattern=user_id, secret=True) | |
775 } | |
776 except gpg.errors.GPGMEError as e: | |
777 raise GPGProviderError("Internal GPGME error") from e | |
778 | |
779 def can_sign(self, public_key: GPGPublicKey) -> bool: | |
780 assert isinstance(public_key, GPGME_GPGPublicKey) | |
781 | |
782 return any(subkey.can_sign for subkey in public_key.key_obj.subkeys) | |
783 | |
784 def can_encrypt(self, public_key: GPGPublicKey) -> bool: | |
785 assert isinstance(public_key, GPGME_GPGPublicKey) | |
786 | |
787 return any(subkey.can_encrypt for subkey in public_key.key_obj.subkeys) | |
788 | |
789 def create_key(self, user_id: str) -> GPGSecretKey: | |
790 with gpg.Context(home_dir=self.__home_dir) as c: | |
791 try: | |
792 result = c.create_key( | |
793 user_id, | |
794 expires=False, | |
795 sign=True, | |
796 encrypt=True, | |
797 certify=False, | |
798 authenticate=False, | |
799 force=True | |
800 ) | |
801 | |
802 key_obj = c.get_key(result.fpr, secret=True) | |
803 except gpg.errors.GPGMEError as e: | |
804 raise GPGProviderError("Internal GPGME error") from e | |
805 except gpg.errors.KeyError as e: | |
806 raise GPGProviderError("Newly created key not found") from e | |
807 | |
808 return GPGME_GPGSecretKey(GPGME_GPGPublicKey(key_obj)) | |
809 | |
810 | |
811 class PublicKeyMetadata(NamedTuple): | |
812 """ | |
813 Metadata about a published public key. | |
814 """ | |
815 | |
816 fingerprint: str | |
817 timestamp: datetime | |
818 | |
819 | |
820 @enum.unique | |
821 class TrustLevel(enum.Enum): | |
822 """ | |
823 The trust levels required for BTBV and manual trust. | |
824 """ | |
825 | |
826 TRUSTED: str = "TRUSTED" | |
827 BLINDLY_TRUSTED: str = "BLINDLY_TRUSTED" | |
828 UNDECIDED: str = "UNDECIDED" | |
829 DISTRUSTED: str = "DISTRUSTED" | |
830 | |
831 | |
832 OPENPGP_SCHEMA = xmlschema.XMLSchema("""<?xml version="1.0" encoding="utf8"?> | |
833 <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" | |
834 targetNamespace="urn:xmpp:openpgp:0" | |
835 xmlns="urn:xmpp:openpgp:0"> | |
836 | |
837 <xs:element name="openpgp" type="xs:base64Binary"/> | |
838 </xs:schema> | |
839 """) | |
840 | |
841 | |
842 # The following schema needs verion 1.1 of XML Schema, which is not supported by lxml. | |
843 # Luckily, xmlschema exists, which is a clean, well maintained, cross-platform | |
844 # implementation of XML Schema, including version 1.1. | |
845 CONTENT_SCHEMA = xmlschema.XMLSchema11("""<?xml version="1.1" encoding="utf8"?> | |
846 <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" | |
847 targetNamespace="urn:xmpp:openpgp:0" | |
848 xmlns="urn:xmpp:openpgp:0"> | |
849 | |
850 <xs:element name="signcrypt"> | |
851 <xs:complexType> | |
852 <xs:all> | |
853 <xs:element ref="to" maxOccurs="unbounded"/> | |
854 <xs:element ref="time"/> | |
855 <xs:element ref="rpad" minOccurs="0"/> | |
856 <xs:element ref="payload"/> | |
857 </xs:all> | |
858 </xs:complexType> | |
859 </xs:element> | |
860 | |
861 <xs:element name="sign"> | |
862 <xs:complexType> | |
863 <xs:all> | |
864 <xs:element ref="to" maxOccurs="unbounded"/> | |
865 <xs:element ref="time"/> | |
866 <xs:element ref="rpad" minOccurs="0"/> | |
867 <xs:element ref="payload"/> | |
868 </xs:all> | |
869 </xs:complexType> | |
870 </xs:element> | |
871 | |
872 <xs:element name="crypt"> | |
873 <xs:complexType> | |
874 <xs:all> | |
875 <xs:element ref="to" minOccurs="0" maxOccurs="unbounded"/> | |
876 <xs:element ref="time"/> | |
877 <xs:element ref="rpad" minOccurs="0"/> | |
878 <xs:element ref="payload"/> | |
879 </xs:all> | |
880 </xs:complexType> | |
881 </xs:element> | |
882 | |
883 <xs:element name="to"> | |
884 <xs:complexType> | |
885 <xs:attribute name="jid" type="xs:string"/> | |
886 </xs:complexType> | |
887 </xs:element> | |
888 | |
889 <xs:element name="time"> | |
890 <xs:complexType> | |
891 <xs:attribute name="stamp" type="xs:dateTime"/> | |
892 </xs:complexType> | |
893 </xs:element> | |
894 | |
895 <xs:element name="rpad" type="xs:string"/> | |
896 | |
897 <xs:element name="payload"> | |
898 <xs:complexType> | |
899 <xs:sequence> | |
900 <xs:any minOccurs="0" maxOccurs="unbounded" processContents="skip"/> | |
901 </xs:sequence> | |
902 </xs:complexType> | |
903 </xs:element> | |
904 </xs:schema> | |
905 """) | |
906 | |
907 | |
908 PUBLIC_KEYS_LIST_NODE = "urn:xmpp:openpgp:0:public-keys" | |
909 PUBLIC_KEYS_LIST_SCHEMA = xmlschema.XMLSchema("""<?xml version="1.0" encoding="utf8"?> | |
910 <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" | |
911 targetNamespace="urn:xmpp:openpgp:0" | |
912 xmlns="urn:xmpp:openpgp:0"> | |
913 | |
914 <xs:element name="public-keys-list"> | |
915 <xs:complexType> | |
916 <xs:sequence> | |
917 <xs:element ref="pubkey-metadata" minOccurs="0" maxOccurs="unbounded"/> | |
918 </xs:sequence> | |
919 </xs:complexType> | |
920 </xs:element> | |
921 | |
922 <xs:element name="pubkey-metadata"> | |
923 <xs:complexType> | |
924 <xs:attribute name="v4-fingerprint" type="xs:string"/> | |
925 <xs:attribute name="date" type="xs:dateTime"/> | |
926 </xs:complexType> | |
927 </xs:element> | |
928 </xs:schema> | |
929 """) | |
930 | |
931 | |
932 PUBKEY_SCHEMA = xmlschema.XMLSchema("""<?xml version="1.0" encoding="utf8"?> | |
933 <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" | |
934 targetNamespace="urn:xmpp:openpgp:0" | |
935 xmlns="urn:xmpp:openpgp:0"> | |
936 | |
937 <xs:element name="pubkey"> | |
938 <xs:complexType> | |
939 <xs:all> | |
940 <xs:element ref="data"/> | |
941 </xs:all> | |
942 <xs:anyAttribute processContents="skip"/> | |
943 </xs:complexType> | |
944 </xs:element> | |
945 | |
946 <xs:element name="data" type="xs:base64Binary"/> | |
947 </xs:schema> | |
948 """) | |
949 | |
950 | |
951 SECRETKEY_SCHEMA = xmlschema.XMLSchema("""<?xml version="1.0" encoding="utf8"?> | |
952 <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" | |
953 targetNamespace="urn:xmpp:openpgp:0" | |
954 xmlns="urn:xmpp:openpgp:0"> | |
955 | |
956 <xs:element name="secretkey" type="xs:base64Binary"/> | |
957 </xs:schema> | |
958 """) | |
959 | |
960 | |
961 DEFAULT_TRUST_MODEL_PARAM = f""" | |
962 <params> | |
963 <individual> | |
964 <category name="{PARAM_CATEGORY}" label={quoteattr(D_('Security'))}> | |
965 <param name="{PARAM_NAME}" | |
966 label={quoteattr(D_('OMEMO default trust policy'))} | |
967 type="list" security="3"> | |
968 <option value="manual" label={quoteattr(D_('Manual trust (more secure)'))} /> | |
969 <option value="btbv" | |
970 label={quoteattr(D_('Blind Trust Before Verification (more user friendly)'))} | |
971 selected="true" /> | |
972 </param> | |
973 </category> | |
974 </individual> | |
975 </params> | |
976 """ | |
977 | |
978 | |
979 def get_gpg_provider(sat: SAT, client: SatXMPPClient) -> GPGProvider: | |
980 """Get the GPG provider for a client. | |
981 | |
982 @param sat: The SAT instance. | |
983 @param client: The client. | |
984 @return: The GPG provider specifically for that client. | |
985 """ | |
986 | |
987 return GPGME_GPGProvider(str(sat.get_local_path(client, "gnupg-home"))) | |
988 | |
989 | |
990 def generate_passphrase() -> str: | |
991 """Generate a secure passphrase for symmetric encryption. | |
992 | |
993 @return: The passphrase. | |
994 """ | |
995 | |
996 return "-".join("".join( | |
997 secrets.choice("123456789ABCDEFGHIJKLMNPQRSTUVWXYZ") for __ in range(4) | |
998 ) for __ in range(6)) | |
999 | |
1000 | |
1001 # TODO: Handle the user id mess | |
1002 class XEP_0373: | |
1003 """ | |
1004 Implementation of XEP-0373: OpenPGP for XMPP under namespace ``urn:xmpp:openpgp:0``. | |
1005 """ | |
1006 | |
1007 def __init__(self, host: SAT) -> None: | |
1008 """ | |
1009 @param sat: The SAT instance. | |
1010 """ | |
1011 | |
1012 self.host = host | |
1013 | |
1014 # Add configuration option to choose between manual trust and BTBV as the trust | |
1015 # model | |
1016 host.memory.update_params(DEFAULT_TRUST_MODEL_PARAM) | |
1017 | |
1018 self.__xep_0045 = cast(Optional[XEP_0045], host.plugins.get("XEP-0045")) | |
1019 self.__xep_0060 = cast(XEP_0060, host.plugins["XEP-0060"]) | |
1020 | |
1021 self.__storage: Dict[str, persistent.LazyPersistentBinaryDict] = {} | |
1022 | |
1023 xep_0163 = cast(XEP_0163, host.plugins["XEP-0163"]) | |
1024 xep_0163.add_pep_event( | |
1025 "OX_PUBLIC_KEYS_LIST", | |
1026 PUBLIC_KEYS_LIST_NODE, | |
1027 lambda items_event, profile: defer.ensureDeferred( | |
1028 self.__on_public_keys_list_update(items_event, profile) | |
1029 ) | |
1030 ) | |
1031 | |
1032 async def profile_connecting(self, client): | |
1033 client.gpg_provider = get_gpg_provider(self.host, client) | |
1034 | |
1035 async def profile_connected( # pylint: disable=invalid-name | |
1036 self, | |
1037 client: SatXMPPClient | |
1038 ) -> None: | |
1039 """ | |
1040 @param client: The client. | |
1041 """ | |
1042 | |
1043 profile = cast(str, client.profile) | |
1044 | |
1045 if not profile in self.__storage: | |
1046 self.__storage[profile] = \ | |
1047 persistent.LazyPersistentBinaryDict("XEP-0373", client.profile) | |
1048 | |
1049 if len(self.list_secret_keys(client)) == 0: | |
1050 log.debug(f"Generating first GPG key for {client.jid.userhost()}.") | |
1051 await self.create_key(client) | |
1052 | |
1053 async def __on_public_keys_list_update( | |
1054 self, | |
1055 items_event: pubsub.ItemsEvent, | |
1056 profile: str | |
1057 ) -> None: | |
1058 """Handle public keys list updates fired by PEP. | |
1059 | |
1060 @param items_event: The event. | |
1061 @param profile: The profile this event belongs to. | |
1062 """ | |
1063 | |
1064 client = self.host.get_client(profile) | |
1065 | |
1066 sender = cast(jid.JID, items_event.sender) | |
1067 items = cast(List[domish.Element], items_event.items) | |
1068 | |
1069 if len(items) > 1: | |
1070 log.warning("Ignoring public keys list update with more than one element.") | |
1071 return | |
1072 | |
1073 item_elt = next(iter(items), None) | |
1074 if item_elt is None: | |
1075 log.debug("Ignoring empty public keys list update.") | |
1076 return | |
1077 | |
1078 public_keys_list_elt = cast( | |
1079 Optional[domish.Element], | |
1080 next(item_elt.elements(NS_OX, "public-keys-list"), None) | |
1081 ) | |
1082 | |
1083 pubkey_metadata_elts: Optional[List[domish.Element]] = None | |
1084 | |
1085 if public_keys_list_elt is not None: | |
1086 try: | |
1087 PUBLIC_KEYS_LIST_SCHEMA.validate(public_keys_list_elt.toXml()) | |
1088 except xmlschema.XMLSchemaValidationError: | |
1089 pass | |
1090 else: | |
1091 pubkey_metadata_elts = \ | |
1092 list(public_keys_list_elt.elements(NS_OX, "pubkey-metadata")) | |
1093 | |
1094 if pubkey_metadata_elts is None: | |
1095 log.warning(f"Malformed public keys list update item: {item_elt.toXml()}") | |
1096 return | |
1097 | |
1098 new_public_keys_metadata = { PublicKeyMetadata( | |
1099 fingerprint=cast(str, pubkey_metadata_elt["v4-fingerprint"]), | |
1100 timestamp=parse_datetime(cast(str, pubkey_metadata_elt["date"])) | |
1101 ) for pubkey_metadata_elt in pubkey_metadata_elts } | |
1102 | |
1103 storage_key = STR_KEY_PUBLIC_KEYS_METADATA.format(sender.userhost()) | |
1104 | |
1105 local_public_keys_metadata = cast( | |
1106 Set[PublicKeyMetadata], | |
1107 await self.__storage[profile].get(storage_key, set()) | |
1108 ) | |
1109 | |
1110 unchanged_keys = new_public_keys_metadata & local_public_keys_metadata | |
1111 changed_or_new_keys = new_public_keys_metadata - unchanged_keys | |
1112 available_keys = self.list_public_keys(client, sender) | |
1113 | |
1114 for key_metadata in changed_or_new_keys: | |
1115 # Check whether the changed or new key has been imported before | |
1116 if any(key.fingerprint == key_metadata.fingerprint for key in available_keys): | |
1117 try: | |
1118 # If it has been imported before, try to update it | |
1119 await self.import_public_key(client, sender, key_metadata.fingerprint) | |
1120 except Exception as e: | |
1121 log.warning(f"Public key import failed: {e}") | |
1122 | |
1123 # If the update fails, remove the key from the local metadata list | |
1124 # such that the update is attempted again next time | |
1125 new_public_keys_metadata.remove(key_metadata) | |
1126 | |
1127 # Check whether this update was for our account and make sure all of our keys are | |
1128 # included in the update | |
1129 if sender.userhost() == client.jid.userhost(): | |
1130 secret_keys = self.list_secret_keys(client) | |
1131 missing_keys = set(filter(lambda secret_key: all( | |
1132 key_metadata.fingerprint != secret_key.public_key.fingerprint | |
1133 for key_metadata | |
1134 in new_public_keys_metadata | |
1135 ), secret_keys)) | |
1136 | |
1137 if len(missing_keys) > 0: | |
1138 log.warning( | |
1139 "Public keys list update did not contain at least one of our keys." | |
1140 f" {new_public_keys_metadata}" | |
1141 ) | |
1142 | |
1143 for missing_key in missing_keys: | |
1144 log.warning(missing_key.public_key.fingerprint) | |
1145 new_public_keys_metadata.add(PublicKeyMetadata( | |
1146 fingerprint=missing_key.public_key.fingerprint, | |
1147 timestamp=datetime.now(timezone.utc) | |
1148 )) | |
1149 | |
1150 await self.publish_public_keys_list(client, new_public_keys_metadata) | |
1151 | |
1152 await self.__storage[profile].force(storage_key, new_public_keys_metadata) | |
1153 | |
1154 def list_public_keys(self, client: SatXMPPClient, jid: jid.JID) -> Set[GPGPublicKey]: | |
1155 """List GPG public keys available for a JID. | |
1156 | |
1157 @param client: The client to perform this operation with. | |
1158 @param jid: The JID. Can be a bare JID. | |
1159 @return: The set of public keys available for this JID. | |
1160 """ | |
1161 | |
1162 gpg_provider = get_gpg_provider(self.host, client) | |
1163 | |
1164 return gpg_provider.list_public_keys(f"xmpp:{jid.userhost()}") | |
1165 | |
1166 def list_secret_keys(self, client: SatXMPPClient) -> Set[GPGSecretKey]: | |
1167 """List GPG secret keys available for a JID. | |
1168 | |
1169 @param client: The client to perform this operation with. | |
1170 @return: The set of secret keys available for this JID. | |
1171 """ | |
1172 | |
1173 gpg_provider = get_gpg_provider(self.host, client) | |
1174 | |
1175 return gpg_provider.list_secret_keys(f"xmpp:{client.jid.userhost()}") | |
1176 | |
1177 async def create_key(self, client: SatXMPPClient) -> GPGSecretKey: | |
1178 """Create a new GPG key, capable of signing and encryption. | |
1179 | |
1180 The key is generated without password protection and without expiration. | |
1181 | |
1182 @param client: The client to perform this operation with. | |
1183 @return: The new key. | |
1184 """ | |
1185 | |
1186 gpg_provider = get_gpg_provider(self.host, client) | |
1187 | |
1188 secret_key = gpg_provider.create_key(f"xmpp:{client.jid.userhost()}") | |
1189 | |
1190 await self.publish_public_key(client, secret_key.public_key) | |
1191 | |
1192 storage_key = STR_KEY_PUBLIC_KEYS_METADATA.format(client.jid.userhost()) | |
1193 | |
1194 public_keys_list = cast( | |
1195 Set[PublicKeyMetadata], | |
1196 await self.__storage[client.profile].get(storage_key, set()) | |
1197 ) | |
1198 | |
1199 public_keys_list.add(PublicKeyMetadata( | |
1200 fingerprint=secret_key.public_key.fingerprint, | |
1201 timestamp=datetime.now(timezone.utc) | |
1202 )) | |
1203 | |
1204 await self.publish_public_keys_list(client, public_keys_list) | |
1205 | |
1206 await self.__storage[client.profile].force(storage_key, public_keys_list) | |
1207 | |
1208 return secret_key | |
1209 | |
1210 @staticmethod | |
1211 def __build_content_element( | |
1212 element_name: Literal["signcrypt", "sign", "crypt"], | |
1213 recipient_jids: Iterable[jid.JID], | |
1214 include_rpad: bool | |
1215 ) -> Tuple[domish.Element, domish.Element]: | |
1216 """Build a content element. | |
1217 | |
1218 @param element_name: The name of the content element. | |
1219 @param recipient_jids: The intended recipients of this content element. Can be | |
1220 bare JIDs. | |
1221 @param include_rpad: Whether to include random-length random-content padding. | |
1222 @return: The content element and the ``<payload/>`` element to add the stanza | |
1223 extension elements to. | |
1224 """ | |
1225 | |
1226 content_elt = domish.Element((NS_OX, element_name)) | |
1227 | |
1228 for recipient_jid in recipient_jids: | |
1229 content_elt.addElement("to")["jid"] = recipient_jid.userhost() | |
1230 | |
1231 content_elt.addElement("time")["stamp"] = format_datetime() | |
1232 | |
1233 if include_rpad: | |
1234 # XEP-0373 doesn't specify bounds for the length of the random padding. This | |
1235 # uses the bounds specified in XEP-0420 for the closely related rpad affix. | |
1236 rpad_length = secrets.randbelow(201) | |
1237 rpad_content = "".join( | |
1238 secrets.choice(string.digits + string.ascii_letters + string.punctuation) | |
1239 for __ | |
1240 in range(rpad_length) | |
1241 ) | |
1242 content_elt.addElement("rpad", content=rpad_content) | |
1243 | |
1244 payload_elt = content_elt.addElement("payload") | |
1245 | |
1246 return content_elt, payload_elt | |
1247 | |
1248 @staticmethod | |
1249 def build_signcrypt_element( | |
1250 recipient_jids: Iterable[jid.JID] | |
1251 ) -> Tuple[domish.Element, domish.Element]: | |
1252 """Build a ``<signcrypt/>`` content element. | |
1253 | |
1254 @param recipient_jids: The intended recipients of this content element. Can be | |
1255 bare JIDs. | |
1256 @return: The ``<signcrypt/>`` element and the ``<payload/>`` element to add the | |
1257 stanza extension elements to. | |
1258 """ | |
1259 | |
1260 if len(recipient_jids) == 0: | |
1261 raise ValueError("Recipient JIDs must be provided.") | |
1262 | |
1263 return XEP_0373.__build_content_element("signcrypt", recipient_jids, True) | |
1264 | |
1265 @staticmethod | |
1266 def build_sign_element( | |
1267 recipient_jids: Iterable[jid.JID], | |
1268 include_rpad: bool | |
1269 ) -> Tuple[domish.Element, domish.Element]: | |
1270 """Build a ``<sign/>`` content element. | |
1271 | |
1272 @param recipient_jids: The intended recipients of this content element. Can be | |
1273 bare JIDs. | |
1274 @param include_rpad: Whether to include random-length random-content padding, | |
1275 which is OPTIONAL for the ``<sign/>`` content element. | |
1276 @return: The ``<sign/>`` element and the ``<payload/>`` element to add the stanza | |
1277 extension elements to. | |
1278 """ | |
1279 | |
1280 if len(recipient_jids) == 0: | |
1281 raise ValueError("Recipient JIDs must be provided.") | |
1282 | |
1283 return XEP_0373.__build_content_element("sign", recipient_jids, include_rpad) | |
1284 | |
1285 @staticmethod | |
1286 def build_crypt_element( | |
1287 recipient_jids: Iterable[jid.JID] | |
1288 ) -> Tuple[domish.Element, domish.Element]: | |
1289 """Build a ``<crypt/>`` content element. | |
1290 | |
1291 @param recipient_jids: The intended recipients of this content element. Specifying | |
1292 the intended recipients is OPTIONAL for the ``<crypt/>`` content element. Can | |
1293 be bare JIDs. | |
1294 @return: The ``<crypt/>`` element and the ``<payload/>`` element to add the stanza | |
1295 extension elements to. | |
1296 """ | |
1297 | |
1298 return XEP_0373.__build_content_element("crypt", recipient_jids, True) | |
1299 | |
1300 async def build_openpgp_element( | |
1301 self, | |
1302 client: SatXMPPClient, | |
1303 content_elt: domish.Element, | |
1304 recipient_jids: Set[jid.JID] | |
1305 ) -> domish.Element: | |
1306 """Build an ``<openpgp/>`` element. | |
1307 | |
1308 @param client: The client to perform this operation with. | |
1309 @param content_elt: The content element to contain in the ``<openpgp/>`` element. | |
1310 @param recipient_jids: The recipient's JIDs. Can be bare JIDs. | |
1311 @return: The ``<openpgp/>`` element. | |
1312 """ | |
1313 | |
1314 gpg_provider = get_gpg_provider(self.host, client) | |
1315 | |
1316 # TODO: I'm not sure whether we want to sign with all keys by default or choose | |
1317 # just one key/a subset of keys to sign with. | |
1318 signing_keys = set(filter( | |
1319 lambda secret_key: gpg_provider.can_sign(secret_key.public_key), | |
1320 self.list_secret_keys(client) | |
1321 )) | |
1322 | |
1323 encryption_keys: Set[GPGPublicKey] = set() | |
1324 | |
1325 for recipient_jid in recipient_jids: | |
1326 # import all keys of the recipient | |
1327 all_public_keys = await self.import_all_public_keys(client, recipient_jid) | |
1328 | |
1329 # Filter for keys that can encrypt | |
1330 encryption_keys |= set(filter(gpg_provider.can_encrypt, all_public_keys)) | |
1331 | |
1332 # TODO: Handle trust | |
1333 | |
1334 content = content_elt.toXml().encode("utf-8") | |
1335 data: bytes | |
1336 | |
1337 if content_elt.name == "signcrypt": | |
1338 data = gpg_provider.encrypt(content, encryption_keys, signing_keys) | |
1339 elif content_elt.name == "sign": | |
1340 data = gpg_provider.sign(content, signing_keys) | |
1341 elif content_elt.name == "crypt": | |
1342 data = gpg_provider.encrypt(content, encryption_keys) | |
1343 else: | |
1344 raise ValueError(f"Unknown content element <{content_elt.name}/>") | |
1345 | |
1346 openpgp_elt = domish.Element((NS_OX, "openpgp")) | |
1347 openpgp_elt.addContent(base64.b64encode(data).decode("ASCII")) | |
1348 return openpgp_elt | |
1349 | |
1350 async def unpack_openpgp_element( | |
1351 self, | |
1352 client: SatXMPPClient, | |
1353 openpgp_elt: domish.Element, | |
1354 element_name: Literal["signcrypt", "sign", "crypt"], | |
1355 sender_jid: jid.JID | |
1356 ) -> Tuple[domish.Element, datetime]: | |
1357 """Verify, decrypt and unpack an ``<openpgp/>`` element. | |
1358 | |
1359 @param client: The client to perform this operation with. | |
1360 @param openpgp_elt: The ``<openpgp/>`` element. | |
1361 @param element_name: The name of the content element. | |
1362 @param sender_jid: The sender's JID. Can be a bare JID. | |
1363 @return: The ``<payload/>`` element containing the decrypted/verified stanza | |
1364 extension elements carried by this ``<openpgp/>`` element, and the timestamp | |
1365 contained in the content element. | |
1366 @raise exceptions.ParsingError: on syntactical verification errors. | |
1367 @raise VerificationError: on semantical verification errors accoding to XEP-0373. | |
1368 @raise DecryptionFailed: on decryption failure. | |
1369 @raise VerificationFailed: if the data could not be verified. | |
1370 | |
1371 @warning: The timestamp is not verified for plausibility; this SHOULD be done by | |
1372 the calling code. | |
1373 """ | |
1374 | |
1375 gpg_provider = get_gpg_provider(self.host, client) | |
1376 | |
1377 decryption_keys = set(filter( | |
1378 lambda secret_key: gpg_provider.can_encrypt(secret_key.public_key), | |
1379 self.list_secret_keys(client) | |
1380 )) | |
1381 | |
1382 # import all keys of the sender | |
1383 all_public_keys = await self.import_all_public_keys(client, sender_jid) | |
1384 | |
1385 # Filter for keys that can sign | |
1386 verification_keys = set(filter(gpg_provider.can_sign, all_public_keys)) | |
1387 | |
1388 # TODO: Handle trust | |
1389 | |
1390 try: | |
1391 OPENPGP_SCHEMA.validate(openpgp_elt.toXml()) | |
1392 except xmlschema.XMLSchemaValidationError as e: | |
1393 raise exceptions.ParsingError( | |
1394 "<openpgp/> element doesn't pass schema validation." | |
1395 ) from e | |
1396 | |
1397 openpgp_message = base64.b64decode(str(openpgp_elt)) | |
1398 content: bytes | |
1399 | |
1400 if element_name == "signcrypt": | |
1401 content = gpg_provider.decrypt( | |
1402 openpgp_message, | |
1403 decryption_keys, | |
1404 public_keys=verification_keys | |
1405 ) | |
1406 elif element_name == "sign": | |
1407 content = gpg_provider.verify(openpgp_message, verification_keys) | |
1408 elif element_name == "crypt": | |
1409 content = gpg_provider.decrypt(openpgp_message, decryption_keys) | |
1410 else: | |
1411 assert_never(element_name) | |
1412 | |
1413 try: | |
1414 content_elt = cast( | |
1415 domish.Element, | |
1416 xml_tools.ElementParser()(content.decode("utf-8")) | |
1417 ) | |
1418 except UnicodeDecodeError as e: | |
1419 raise exceptions.ParsingError("UTF-8 decoding error") from e | |
1420 | |
1421 try: | |
1422 CONTENT_SCHEMA.validate(content_elt.toXml()) | |
1423 except xmlschema.XMLSchemaValidationError as e: | |
1424 raise exceptions.ParsingError( | |
1425 f"<{element_name}/> element doesn't pass schema validation." | |
1426 ) from e | |
1427 | |
1428 if content_elt.name != element_name: | |
1429 raise exceptions.ParsingError(f"Not a <{element_name}/> element.") | |
1430 | |
1431 recipient_jids = \ | |
1432 { jid.JID(to_elt["jid"]) for to_elt in content_elt.elements(NS_OX, "to") } | |
1433 | |
1434 if ( | |
1435 client.jid.userhostJID() not in { jid.userhostJID() for jid in recipient_jids } | |
1436 and element_name != "crypt" | |
1437 ): | |
1438 raise VerificationError( | |
1439 f"Recipient list in <{element_name}/> element does not list our (bare)" | |
1440 f" JID." | |
1441 ) | |
1442 | |
1443 time_elt = next(content_elt.elements(NS_OX, "time")) | |
1444 | |
1445 timestamp = parse_datetime(time_elt["stamp"]) | |
1446 | |
1447 payload_elt = next(content_elt.elements(NS_OX, "payload")) | |
1448 | |
1449 return payload_elt, timestamp | |
1450 | |
1451 async def publish_public_key( | |
1452 self, | |
1453 client: SatXMPPClient, | |
1454 public_key: GPGPublicKey | |
1455 ) -> None: | |
1456 """Publish a public key. | |
1457 | |
1458 @param client: The client. | |
1459 @param public_key: The public key to publish. | |
1460 @raise XMPPInteractionFailed: if any interaction via XMPP failed. | |
1461 """ | |
1462 | |
1463 gpg_provider = get_gpg_provider(self.host, client) | |
1464 | |
1465 packet = gpg_provider.export_public_key(public_key) | |
1466 | |
1467 node = f"urn:xmpp:openpgp:0:public-keys:{public_key.fingerprint}" | |
1468 | |
1469 pubkey_elt = domish.Element((NS_OX, "pubkey")) | |
1470 | |
1471 pubkey_elt.addElement("data", content=base64.b64encode(packet).decode("ASCII")) | |
1472 | |
1473 try: | |
1474 await self.__xep_0060.send_item( | |
1475 client, | |
1476 client.jid.userhostJID(), | |
1477 node, | |
1478 pubkey_elt, | |
1479 format_datetime(), | |
1480 extra={ | |
1481 XEP_0060.EXTRA_PUBLISH_OPTIONS: { | |
1482 XEP_0060.OPT_PERSIST_ITEMS: "true", | |
1483 XEP_0060.OPT_ACCESS_MODEL: "open", | |
1484 XEP_0060.OPT_MAX_ITEMS: 1 | |
1485 }, | |
1486 # TODO: Do we really want publish_without_options here? | |
1487 XEP_0060.EXTRA_ON_PRECOND_NOT_MET: "publish_without_options" | |
1488 } | |
1489 ) | |
1490 except Exception as e: | |
1491 raise XMPPInteractionFailed("Publishing the public key failed.") from e | |
1492 | |
1493 async def import_all_public_keys( | |
1494 self, | |
1495 client: SatXMPPClient, | |
1496 entity_jid: jid.JID | |
1497 ) -> Set[GPGPublicKey]: | |
1498 """import all public keys of a JID that have not been imported before. | |
1499 | |
1500 @param client: The client. | |
1501 @param jid: The JID. Can be a bare JID. | |
1502 @return: The public keys. | |
1503 @note: Failure to import a key simply results in the key not being included in the | |
1504 result. | |
1505 """ | |
1506 | |
1507 available_public_keys = self.list_public_keys(client, entity_jid) | |
1508 | |
1509 storage_key = STR_KEY_PUBLIC_KEYS_METADATA.format(entity_jid.userhost()) | |
1510 | |
1511 public_keys_metadata = cast( | |
1512 Set[PublicKeyMetadata], | |
1513 await self.__storage[client.profile].get(storage_key, set()) | |
1514 ) | |
1515 if not public_keys_metadata: | |
1516 public_keys_metadata = await self.download_public_keys_list( | |
1517 client, entity_jid | |
1518 ) | |
1519 if not public_keys_metadata: | |
1520 raise exceptions.NotFound( | |
1521 f"Can't find public keys for {entity_jid}" | |
1522 ) | |
1523 else: | |
1524 await self.__storage[client.profile].aset( | |
1525 storage_key, public_keys_metadata | |
1526 ) | |
1527 | |
1528 | |
1529 missing_keys = set(filter(lambda public_key_metadata: all( | |
1530 public_key_metadata.fingerprint != public_key.fingerprint | |
1531 for public_key | |
1532 in available_public_keys | |
1533 ), public_keys_metadata)) | |
1534 | |
1535 for missing_key in missing_keys: | |
1536 try: | |
1537 available_public_keys.add( | |
1538 await self.import_public_key(client, entity_jid, missing_key.fingerprint) | |
1539 ) | |
1540 except Exception as e: | |
1541 log.warning( | |
1542 f"import of public key {missing_key.fingerprint} owned by" | |
1543 f" {entity_jid.userhost()} failed, ignoring: {e}" | |
1544 ) | |
1545 | |
1546 return available_public_keys | |
1547 | |
1548 async def import_public_key( | |
1549 self, | |
1550 client: SatXMPPClient, | |
1551 jid: jid.JID, | |
1552 fingerprint: str | |
1553 ) -> GPGPublicKey: | |
1554 """import a public key. | |
1555 | |
1556 @param client: The client. | |
1557 @param jid: The JID owning the public key. Can be a bare JID. | |
1558 @param fingerprint: The fingerprint of the public key. | |
1559 @return: The public key. | |
1560 @raise exceptions.NotFound: if the public key was not found. | |
1561 @raise exceptions.ParsingError: on XML-level parsing errors. | |
1562 @raise InvalidPacket: if the packet is either syntactically or semantically deemed | |
1563 invalid. | |
1564 @raise XMPPInteractionFailed: if any interaction via XMPP failed. | |
1565 """ | |
1566 | |
1567 gpg_provider = get_gpg_provider(self.host, client) | |
1568 | |
1569 node = f"urn:xmpp:openpgp:0:public-keys:{fingerprint}" | |
1570 | |
1571 try: | |
1572 items, __ = await self.__xep_0060.get_items( | |
1573 client, | |
1574 jid.userhostJID(), | |
1575 node, | |
1576 max_items=1 | |
1577 ) | |
1578 except exceptions.NotFound as e: | |
1579 raise exceptions.NotFound( | |
1580 f"No public key with fingerprint {fingerprint} published by JID" | |
1581 f" {jid.userhost()}." | |
1582 ) from e | |
1583 except Exception as e: | |
1584 raise XMPPInteractionFailed("Fetching the public keys list failed.") from e | |
1585 | |
1586 try: | |
1587 item_elt = cast(domish.Element, items[0]) | |
1588 except IndexError as e: | |
1589 raise exceptions.NotFound( | |
1590 f"No public key with fingerprint {fingerprint} published by JID" | |
1591 f" {jid.userhost()}." | |
1592 ) from e | |
1593 | |
1594 pubkey_elt = cast( | |
1595 Optional[domish.Element], | |
1596 next(item_elt.elements(NS_OX, "pubkey"), None) | |
1597 ) | |
1598 | |
1599 if pubkey_elt is None: | |
1600 raise exceptions.ParsingError( | |
1601 f"Publish-Subscribe item of JID {jid.userhost()} doesn't contain pubkey" | |
1602 f" element." | |
1603 ) | |
1604 | |
1605 try: | |
1606 PUBKEY_SCHEMA.validate(pubkey_elt.toXml()) | |
1607 except xmlschema.XMLSchemaValidationError as e: | |
1608 raise exceptions.ParsingError( | |
1609 f"Publish-Subscribe item of JID {jid.userhost()} doesn't pass pubkey" | |
1610 f" schema validation." | |
1611 ) from e | |
1612 | |
1613 public_key = gpg_provider.import_public_key(base64.b64decode(str( | |
1614 next(pubkey_elt.elements(NS_OX, "data")) | |
1615 ))) | |
1616 | |
1617 return public_key | |
1618 | |
1619 async def publish_public_keys_list( | |
1620 self, | |
1621 client: SatXMPPClient, | |
1622 public_keys_list: Iterable[PublicKeyMetadata] | |
1623 ) -> None: | |
1624 """Publish/update the own public keys list. | |
1625 | |
1626 @param client: The client. | |
1627 @param public_keys_list: The public keys list. | |
1628 @raise XMPPInteractionFailed: if any interaction via XMPP failed. | |
1629 | |
1630 @warning: All public keys referenced in the public keys list MUST be published | |
1631 beforehand. | |
1632 """ | |
1633 | |
1634 if len({ pkm.fingerprint for pkm in public_keys_list }) != len(public_keys_list): | |
1635 raise ValueError("Public keys list contains duplicate fingerprints.") | |
1636 | |
1637 node = "urn:xmpp:openpgp:0:public-keys" | |
1638 | |
1639 public_keys_list_elt = domish.Element((NS_OX, "public-keys-list")) | |
1640 | |
1641 for public_key_metadata in public_keys_list: | |
1642 pubkey_metadata_elt = public_keys_list_elt.addElement("pubkey-metadata") | |
1643 pubkey_metadata_elt["v4-fingerprint"] = public_key_metadata.fingerprint | |
1644 pubkey_metadata_elt["date"] = format_datetime(public_key_metadata.timestamp) | |
1645 | |
1646 try: | |
1647 await self.__xep_0060.send_item( | |
1648 client, | |
1649 client.jid.userhostJID(), | |
1650 node, | |
1651 public_keys_list_elt, | |
1652 item_id=XEP_0060.ID_SINGLETON, | |
1653 extra={ | |
1654 XEP_0060.EXTRA_PUBLISH_OPTIONS: { | |
1655 XEP_0060.OPT_PERSIST_ITEMS: "true", | |
1656 XEP_0060.OPT_ACCESS_MODEL: "open", | |
1657 XEP_0060.OPT_MAX_ITEMS: 1 | |
1658 }, | |
1659 # TODO: Do we really want publish_without_options here? | |
1660 XEP_0060.EXTRA_ON_PRECOND_NOT_MET: "publish_without_options" | |
1661 } | |
1662 ) | |
1663 except Exception as e: | |
1664 raise XMPPInteractionFailed("Publishing the public keys list failed.") from e | |
1665 | |
1666 async def download_public_keys_list( | |
1667 self, | |
1668 client: SatXMPPClient, | |
1669 jid: jid.JID | |
1670 ) -> Optional[Set[PublicKeyMetadata]]: | |
1671 """Download the public keys list of a JID. | |
1672 | |
1673 @param client: The client. | |
1674 @param jid: The JID. Can be a bare JID. | |
1675 @return: The public keys list or ``None`` if the JID hasn't published a public | |
1676 keys list. An empty list means the JID has published an empty list. | |
1677 @raise exceptions.ParsingError: on XML-level parsing errors. | |
1678 @raise XMPPInteractionFailed: if any interaction via XMPP failed. | |
1679 """ | |
1680 | |
1681 node = "urn:xmpp:openpgp:0:public-keys" | |
1682 | |
1683 try: | |
1684 items, __ = await self.__xep_0060.get_items( | |
1685 client, | |
1686 jid.userhostJID(), | |
1687 node, | |
1688 max_items=1 | |
1689 ) | |
1690 except exceptions.NotFound: | |
1691 return None | |
1692 except Exception as e: | |
1693 raise XMPPInteractionFailed() from e | |
1694 | |
1695 try: | |
1696 item_elt = cast(domish.Element, items[0]) | |
1697 except IndexError: | |
1698 return None | |
1699 | |
1700 public_keys_list_elt = cast( | |
1701 Optional[domish.Element], | |
1702 next(item_elt.elements(NS_OX, "public-keys-list"), None) | |
1703 ) | |
1704 | |
1705 if public_keys_list_elt is None: | |
1706 return None | |
1707 | |
1708 try: | |
1709 PUBLIC_KEYS_LIST_SCHEMA.validate(public_keys_list_elt.toXml()) | |
1710 except xmlschema.XMLSchemaValidationError as e: | |
1711 raise exceptions.ParsingError( | |
1712 f"Publish-Subscribe item of JID {jid.userhost()} doesn't pass public keys" | |
1713 f" list schema validation." | |
1714 ) from e | |
1715 | |
1716 return { | |
1717 PublicKeyMetadata( | |
1718 fingerprint=pubkey_metadata_elt["v4-fingerprint"], | |
1719 timestamp=parse_datetime(pubkey_metadata_elt["date"]) | |
1720 ) | |
1721 for pubkey_metadata_elt | |
1722 in public_keys_list_elt.elements(NS_OX, "pubkey-metadata") | |
1723 } | |
1724 | |
1725 async def __prepare_secret_key_synchronization( | |
1726 self, | |
1727 client: SatXMPPClient | |
1728 ) -> Optional[domish.Element]: | |
1729 """Prepare for secret key synchronization. | |
1730 | |
1731 Makes sure the relative protocols and protocol extensions are supported by the | |
1732 server and makes sure that the PEP node for secret synchronization exists and is | |
1733 configured correctly. The node is created if necessary. | |
1734 | |
1735 @param client: The client. | |
1736 @return: As part of the preparations, the secret key synchronization PEP node is | |
1737 fetched. The result of that fetch is returned here. | |
1738 @raise exceptions.FeatureNotFound: if the server lacks support for the required | |
1739 protocols or protocol extensions. | |
1740 @raise XMPPInteractionFailed: if any interaction via XMPP failed. | |
1741 """ | |
1742 | |
1743 try: | |
1744 infos = cast(DiscoInfo, await self.host.memory.disco.get_infos( | |
1745 client, | |
1746 client.jid.userhostJID() | |
1747 )) | |
1748 except Exception as e: | |
1749 raise XMPPInteractionFailed( | |
1750 "Error performing service discovery on the own bare JID." | |
1751 ) from e | |
1752 | |
1753 identities = cast(Dict[Tuple[str, str], str], infos.identities) | |
1754 features = cast(Set[DiscoFeature], infos.features) | |
1755 | |
1756 if ("pubsub", "pep") not in identities: | |
1757 raise exceptions.FeatureNotFound("Server doesn't support PEP.") | |
1758 | |
1759 if "http://jabber.org/protocol/pubsub#access-whitelist" not in features: | |
1760 raise exceptions.FeatureNotFound( | |
1761 "Server doesn't support the whitelist access model." | |
1762 ) | |
1763 | |
1764 persistent_items_supported = \ | |
1765 "http://jabber.org/protocol/pubsub#persistent-items" in features | |
1766 | |
1767 # TODO: persistent-items is a SHOULD, how do we handle the feature missing? | |
1768 | |
1769 node = "urn:xmpp:openpgp:0:secret-key" | |
1770 | |
1771 try: | |
1772 items, __ = await self.__xep_0060.get_items( | |
1773 client, | |
1774 client.jid.userhostJID(), | |
1775 node, | |
1776 max_items=1 | |
1777 ) | |
1778 except exceptions.NotFound: | |
1779 try: | |
1780 await self.__xep_0060.createNode( | |
1781 client, | |
1782 client.jid.userhostJID(), | |
1783 node, | |
1784 { | |
1785 XEP_0060.OPT_PERSIST_ITEMS: "true", | |
1786 XEP_0060.OPT_ACCESS_MODEL: "whitelist", | |
1787 XEP_0060.OPT_MAX_ITEMS: "1" | |
1788 } | |
1789 ) | |
1790 except Exception as e: | |
1791 raise XMPPInteractionFailed( | |
1792 "Error creating the secret key synchronization node." | |
1793 ) from e | |
1794 except Exception as e: | |
1795 raise XMPPInteractionFailed( | |
1796 "Error fetching the secret key synchronization node." | |
1797 ) from e | |
1798 | |
1799 try: | |
1800 return cast(domish.Element, items[0]) | |
1801 except IndexError: | |
1802 return None | |
1803 | |
1804 async def export_secret_keys( | |
1805 self, | |
1806 client: SatXMPPClient, | |
1807 secret_keys: Iterable[GPGSecretKey] | |
1808 ) -> str: | |
1809 """Export secret keys to synchronize them with other devices. | |
1810 | |
1811 @param client: The client. | |
1812 @param secret_keys: The secret keys to export. | |
1813 @return: The backup code needed to decrypt the exported secret keys. | |
1814 @raise exceptions.FeatureNotFound: if the server lacks support for the required | |
1815 protocols or protocol extensions. | |
1816 @raise XMPPInteractionFailed: if any interaction via XMPP failed. | |
1817 """ | |
1818 | |
1819 gpg_provider = get_gpg_provider(self.host, client) | |
1820 | |
1821 await self.__prepare_secret_key_synchronization(client) | |
1822 | |
1823 backup_code = generate_passphrase() | |
1824 | |
1825 plaintext = b"".join( | |
1826 gpg_provider.backup_secret_key(secret_key) for secret_key in secret_keys | |
1827 ) | |
1828 | |
1829 ciphertext = gpg_provider.encrypt_symmetrically(plaintext, backup_code) | |
1830 | |
1831 node = "urn:xmpp:openpgp:0:secret-key" | |
1832 | |
1833 secretkey_elt = domish.Element((NS_OX, "secretkey")) | |
1834 secretkey_elt.addContent(base64.b64encode(ciphertext).decode("ASCII")) | |
1835 | |
1836 try: | |
1837 await self.__xep_0060.send_item( | |
1838 client, | |
1839 client.jid.userhostJID(), | |
1840 node, | |
1841 secretkey_elt | |
1842 ) | |
1843 except Exception as e: | |
1844 raise XMPPInteractionFailed("Publishing the secret keys failed.") from e | |
1845 | |
1846 return backup_code | |
1847 | |
1848 async def download_secret_keys(self, client: SatXMPPClient) -> Optional[bytes]: | |
1849 """Download previously exported secret keys to import them in a second step. | |
1850 | |
1851 The downloading and importing steps are separate since a backup code is required | |
1852 for the import and it should be possible to try multiple backup codes without | |
1853 redownloading the data every time. The second half of the import procedure is | |
1854 provided by :meth:`import_secret_keys`. | |
1855 | |
1856 @param client: The client. | |
1857 @return: The encrypted secret keys previously exported, if any. | |
1858 @raise exceptions.FeatureNotFound: if the server lacks support for the required | |
1859 protocols or protocol extensions. | |
1860 @raise exceptions.ParsingError: on XML-level parsing errors. | |
1861 @raise XMPPInteractionFailed: if any interaction via XMPP failed. | |
1862 """ | |
1863 | |
1864 item_elt = await self.__prepare_secret_key_synchronization(client) | |
1865 if item_elt is None: | |
1866 return None | |
1867 | |
1868 secretkey_elt = cast( | |
1869 Optional[domish.Element], | |
1870 next(item_elt.elements(NS_OX, "secretkey"), None) | |
1871 ) | |
1872 | |
1873 if secretkey_elt is None: | |
1874 return None | |
1875 | |
1876 try: | |
1877 SECRETKEY_SCHEMA.validate(secretkey_elt.toXml()) | |
1878 except xmlschema.XMLSchemaValidationError as e: | |
1879 raise exceptions.ParsingError( | |
1880 "Publish-Subscribe item doesn't pass secretkey schema validation." | |
1881 ) from e | |
1882 | |
1883 return base64.b64decode(str(secretkey_elt)) | |
1884 | |
1885 def import_secret_keys( | |
1886 self, | |
1887 client: SatXMPPClient, | |
1888 ciphertext: bytes, | |
1889 backup_code: str | |
1890 ) -> Set[GPGSecretKey]: | |
1891 """import previously downloaded secret keys. | |
1892 | |
1893 The downloading and importing steps are separate since a backup code is required | |
1894 for the import and it should be possible to try multiple backup codes without | |
1895 redownloading the data every time. The first half of the import procedure is | |
1896 provided by :meth:`download_secret_keys`. | |
1897 | |
1898 @param client: The client to perform this operation with. | |
1899 @param ciphertext: The ciphertext, i.e. the data returned by | |
1900 :meth:`download_secret_keys`. | |
1901 @param backup_code: The backup code needed to decrypt the data. | |
1902 @raise InvalidPacket: if one of the GPG packets building the secret key data is | |
1903 either syntactically or semantically deemed invalid. | |
1904 @raise DecryptionFailed: on decryption failure. | |
1905 """ | |
1906 | |
1907 gpg_provider = get_gpg_provider(self.host, client) | |
1908 | |
1909 return gpg_provider.restore_secret_keys(gpg_provider.decrypt_symmetrically( | |
1910 ciphertext, | |
1911 backup_code | |
1912 )) | |
1913 | |
1914 @staticmethod | |
1915 def __get_joined_muc_users( | |
1916 client: SatXMPPClient, | |
1917 xep_0045: XEP_0045, | |
1918 room_jid: jid.JID | |
1919 ) -> Set[jid.JID]: | |
1920 """ | |
1921 @param client: The client. | |
1922 @param xep_0045: A MUC plugin instance. | |
1923 @param room_jid: The room JID. | |
1924 @return: A set containing the bare JIDs of the MUC participants. | |
1925 @raise InternalError: if the MUC is not joined or the entity information of a | |
1926 participant isn't available. | |
1927 """ | |
1928 # TODO: This should probably be a global helper somewhere | |
1929 | |
1930 bare_jids: Set[jid.JID] = set() | |
1931 | |
1932 try: | |
1933 room = cast(muc.Room, xep_0045.get_room(client, room_jid)) | |
1934 except exceptions.NotFound as e: | |
1935 raise exceptions.InternalError( | |
1936 "Participant list of unjoined MUC requested." | |
1937 ) from e | |
1938 | |
1939 for user in cast(Dict[str, muc.User], room.roster).values(): | |
1940 entity = cast(Optional[SatXMPPEntity], user.entity) | |
1941 if entity is None: | |
1942 raise exceptions.InternalError( | |
1943 f"Participant list of MUC requested, but the entity information of" | |
1944 f" the participant {user} is not available." | |
1945 ) | |
1946 | |
1947 bare_jids.add(entity.jid.userhostJID()) | |
1948 | |
1949 return bare_jids | |
1950 | |
1951 async def get_trust( | |
1952 self, | |
1953 client: SatXMPPClient, | |
1954 public_key: GPGPublicKey, | |
1955 owner: jid.JID | |
1956 ) -> TrustLevel: | |
1957 """Query the trust level of a public key. | |
1958 | |
1959 @param client: The client to perform this operation under. | |
1960 @param public_key: The public key. | |
1961 @param owner: The owner of the public key. Can be a bare JID. | |
1962 @return: The trust level. | |
1963 """ | |
1964 | |
1965 key = f"/trust/{owner.userhost()}/{public_key.fingerprint}" | |
1966 | |
1967 try: | |
1968 return TrustLevel(await self.__storage[client.profile][key]) | |
1969 except KeyError: | |
1970 return TrustLevel.UNDECIDED | |
1971 | |
1972 async def set_trust( | |
1973 self, | |
1974 client: SatXMPPClient, | |
1975 public_key: GPGPublicKey, | |
1976 owner: jid.JID, | |
1977 trust_level: TrustLevel | |
1978 ) -> None: | |
1979 """Set the trust level of a public key. | |
1980 | |
1981 @param client: The client to perform this operation under. | |
1982 @param public_key: The public key. | |
1983 @param owner: The owner of the public key. Can be a bare JID. | |
1984 @param trust_leve: The trust level. | |
1985 """ | |
1986 | |
1987 key = f"/trust/{owner.userhost()}/{public_key.fingerprint}" | |
1988 | |
1989 await self.__storage[client.profile].force(key, trust_level.name) | |
1990 | |
1991 async def get_trust_ui( # pylint: disable=invalid-name | |
1992 self, | |
1993 client: SatXMPPClient, | |
1994 entity: jid.JID | |
1995 ) -> xml_tools.XMLUI: | |
1996 """ | |
1997 @param client: The client. | |
1998 @param entity: The entity whose device trust levels to manage. | |
1999 @return: An XMLUI instance which opens a form to manage the trust level of all | |
2000 devices belonging to the entity. | |
2001 """ | |
2002 | |
2003 if entity.resource: | |
2004 raise ValueError("A bare JID is expected.") | |
2005 | |
2006 bare_jids: Set[jid.JID] | |
2007 if self.__xep_0045 is not None and self.__xep_0045.is_joined_room(client, entity): | |
2008 bare_jids = self.__get_joined_muc_users(client, self.__xep_0045, entity) | |
2009 else: | |
2010 bare_jids = { entity.userhostJID() } | |
2011 | |
2012 all_public_keys = list({ | |
2013 bare_jid: list(self.list_public_keys(client, bare_jid)) | |
2014 for bare_jid | |
2015 in bare_jids | |
2016 }.items()) | |
2017 | |
2018 async def callback( | |
2019 data: Any, | |
2020 profile: str # pylint: disable=unused-argument | |
2021 ) -> Dict[Never, Never]: | |
2022 """ | |
2023 @param data: The XMLUI result produces by the trust UI form. | |
2024 @param profile: The profile. | |
2025 @return: An empty dictionary. The type of the return value was chosen | |
2026 conservatively since the exact options are neither known not needed here. | |
2027 """ | |
2028 | |
2029 if C.bool(data.get("cancelled", "false")): | |
2030 return {} | |
2031 | |
2032 data_form_result = cast( | |
2033 Dict[str, str], | |
2034 xml_tools.xmlui_result_2_data_form_result(data) | |
2035 ) | |
2036 for key, value in data_form_result.items(): | |
2037 if not key.startswith("trust_"): | |
2038 continue | |
2039 | |
2040 outer_index, inner_index = key.split("_")[1:] | |
2041 | |
2042 owner, public_keys = all_public_keys[int(outer_index)] | |
2043 public_key = public_keys[int(inner_index)] | |
2044 trust = TrustLevel(value) | |
2045 | |
2046 if (await self.get_trust(client, public_key, owner)) is not trust: | |
2047 await self.set_trust(client, public_key, owner, value) | |
2048 | |
2049 return {} | |
2050 | |
2051 submit_id = self.host.register_callback(callback, with_data=True, one_shot=True) | |
2052 | |
2053 result = xml_tools.XMLUI( | |
2054 panel_type=C.XMLUI_FORM, | |
2055 title=D_("OX trust management"), | |
2056 submit_id=submit_id | |
2057 ) | |
2058 # Casting this to Any, otherwise all calls on the variable cause type errors | |
2059 # pylint: disable=no-member | |
2060 trust_ui = cast(Any, result) | |
2061 trust_ui.addText(D_( | |
2062 "This is OX trusting system. You'll see below the GPG keys of your " | |
2063 "contacts, and a list selection to trust them or not. A trusted key " | |
2064 "can read your messages in plain text, so be sure to only validate " | |
2065 "keys that you are sure are belonging to your contact. It's better " | |
2066 "to do this when you are next to your contact, so " | |
2067 "you can check the \"fingerprint\" of the key " | |
2068 "yourself. Do *not* validate a key if the fingerprint is wrong!" | |
2069 )) | |
2070 | |
2071 own_secret_keys = self.list_secret_keys(client) | |
2072 | |
2073 trust_ui.change_container("label") | |
2074 for index, secret_key in enumerate(own_secret_keys): | |
2075 trust_ui.addLabel(D_(f"Own secret key {index} fingerprint")) | |
2076 trust_ui.addText(secret_key.public_key.fingerprint) | |
2077 trust_ui.addEmpty() | |
2078 trust_ui.addEmpty() | |
2079 | |
2080 for outer_index, [ owner, public_keys ] in enumerate(all_public_keys): | |
2081 for inner_index, public_key in enumerate(public_keys): | |
2082 trust_ui.addLabel(D_("Contact")) | |
2083 trust_ui.addJid(jid.JID(owner)) | |
2084 trust_ui.addLabel(D_("Fingerprint")) | |
2085 trust_ui.addText(public_key.fingerprint) | |
2086 trust_ui.addLabel(D_("Trust this device?")) | |
2087 | |
2088 current_trust_level = await self.get_trust(client, public_key, owner) | |
2089 avaiable_trust_levels = \ | |
2090 { TrustLevel.DISTRUSTED, TrustLevel.TRUSTED, current_trust_level } | |
2091 | |
2092 trust_ui.addList( | |
2093 f"trust_{outer_index}_{inner_index}", | |
2094 options=[ trust_level.name for trust_level in avaiable_trust_levels ], | |
2095 selected=current_trust_level.name, | |
2096 styles=[ "inline" ] | |
2097 ) | |
2098 | |
2099 trust_ui.addEmpty() | |
2100 trust_ui.addEmpty() | |
2101 | |
2102 return result |