Mercurial > libervia-backend
comparison sat/plugins/plugin_xep_0373.py @ 3933:cecf45416403
plugin XEP-0373 and XEP-0374: Implementation of OX and OXIM:
GPGME is used as the GPG provider.
rel 374
author | Syndace <me@syndace.dev> |
---|---|
date | Tue, 20 Sep 2022 16:22:18 +0200 |
parents | |
children | a92eef737703 |
comparison
equal
deleted
inserted
replaced
3932:7af29260ecb8 | 3933:cecf45416403 |
---|---|
1 #!/usr/bin/env python3 | |
2 | |
3 # Libervia plugin for OpenPGP for XMPP | |
4 # Copyright (C) 2022-2022 Tim Henkes (me@syndace.dev) | |
5 | |
6 # This program is free software: you can redistribute it and/or modify | |
7 # it under the terms of the GNU Affero General Public License as published by | |
8 # the Free Software Foundation, either version 3 of the License, or | |
9 # (at your option) any later version. | |
10 | |
11 # This program is distributed in the hope that it will be useful, | |
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
14 # GNU Affero General Public License for more details. | |
15 | |
16 # You should have received a copy of the GNU Affero General Public License | |
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
18 | |
19 from abc import ABC, abstractmethod | |
20 import base64 | |
21 from datetime import datetime, timezone | |
22 import enum | |
23 import secrets | |
24 import string | |
25 from typing import Any, Dict, Iterable, List, Literal, Optional, Set, Tuple, cast | |
26 from xml.sax.saxutils import quoteattr | |
27 | |
28 from typing_extensions import Final, NamedTuple, Never, assert_never | |
29 from wokkel import muc, pubsub | |
30 from wokkel.disco import DiscoFeature, DiscoInfo | |
31 import xmlschema | |
32 | |
33 from sat.core import exceptions | |
34 from sat.core.constants import Const as C | |
35 from sat.core.core_types import SatXMPPEntity | |
36 from sat.core.i18n import _, D_ | |
37 from sat.core.log import getLogger, Logger | |
38 from sat.core.sat_main import SAT | |
39 from sat.core.xmpp import SatXMPPClient | |
40 from sat.memory import persistent | |
41 from sat.plugins.plugin_xep_0045 import XEP_0045 | |
42 from sat.plugins.plugin_xep_0060 import XEP_0060 | |
43 from sat.plugins.plugin_xep_0163 import XEP_0163 | |
44 from sat.tools.xmpp_datetime import format_datetime, parse_datetime | |
45 from sat.tools import xml_tools | |
46 from twisted.internet import defer | |
47 from twisted.words.protocols.jabber import jid | |
48 from twisted.words.xish import domish | |
49 | |
50 try: | |
51 import gpg | |
52 except ImportError as import_error: | |
53 raise exceptions.MissingModule( | |
54 "You are missing the 'gpg' package required by the OX plugin. The recommended" | |
55 " installation method is via your operating system's package manager, since the" | |
56 " version of the library has to match the version of your GnuPG installation. See" | |
57 " https://wiki.python.org/moin/GnuPrivacyGuard#Accessing_GnuPG_via_gpgme" | |
58 ) from import_error | |
59 | |
60 | |
61 __all__ = [ # pylint: disable=unused-variable | |
62 "PLUGIN_INFO", | |
63 "NS_OX", | |
64 "XEP_0373", | |
65 "VerificationError", | |
66 "XMPPInteractionFailed", | |
67 "InvalidPacket", | |
68 "DecryptionFailed", | |
69 "VerificationFailed", | |
70 "UnknownKey", | |
71 "GPGProviderError", | |
72 "GPGPublicKey", | |
73 "GPGSecretKey", | |
74 "GPGProvider", | |
75 "PublicKeyMetadata", | |
76 "gpg_provider", | |
77 "TrustLevel" | |
78 ] | |
79 | |
80 | |
81 log = cast(Logger, getLogger(__name__)) # type: ignore[no-untyped-call] | |
82 | |
83 | |
84 PLUGIN_INFO = { | |
85 C.PI_NAME: "XEP-0373", | |
86 C.PI_IMPORT_NAME: "XEP-0373", | |
87 C.PI_TYPE: "SEC", | |
88 C.PI_PROTOCOLS: [ "XEP-0373" ], | |
89 C.PI_DEPENDENCIES: [ "XEP-0060", "XEP-0163" ], | |
90 C.PI_RECOMMENDATIONS: [], | |
91 C.PI_MAIN: "XEP_0373", | |
92 C.PI_HANDLER: "no", | |
93 C.PI_DESCRIPTION: D_("Implementation of OpenPGP for XMPP"), | |
94 } | |
95 | |
96 | |
97 NS_OX: Final = "urn:xmpp:openpgp:0" | |
98 | |
99 | |
100 PARAM_CATEGORY = "Security" | |
101 PARAM_NAME = "ox_policy" | |
102 | |
103 | |
104 class VerificationError(Exception): | |
105 """ | |
106 Raised by verifying methods of :class:`XEP_0373` on semantical verification errors. | |
107 """ | |
108 | |
109 | |
110 class XMPPInteractionFailed(Exception): | |
111 """ | |
112 Raised by methods of :class:`XEP_0373` on XMPP interaction failure. The reason this | |
113 exception exists is that the exceptions raised by XMPP interactions are not properly | |
114 documented for the most part, thus all exceptions are caught and wrapped in instances | |
115 of this class. | |
116 """ | |
117 | |
118 | |
119 class InvalidPacket(ValueError): | |
120 """ | |
121 Raised by methods of :class:`GPGProvider` when an invalid packet is encountered. | |
122 """ | |
123 | |
124 | |
125 class DecryptionFailed(Exception): | |
126 """ | |
127 Raised by methods of :class:`GPGProvider` on decryption failures. | |
128 """ | |
129 | |
130 | |
131 class VerificationFailed(Exception): | |
132 """ | |
133 Raised by methods of :class:`GPGProvider` on verification failures. | |
134 """ | |
135 | |
136 | |
137 class UnknownKey(ValueError): | |
138 """ | |
139 Raised by methods of :class:`GPGProvider` when an unknown key is referenced. | |
140 """ | |
141 | |
142 | |
143 class GPGProviderError(Exception): | |
144 """ | |
145 Raised by methods of :class:`GPGProvider` on internal errors. | |
146 """ | |
147 | |
148 | |
149 class GPGPublicKey(ABC): | |
150 """ | |
151 Interface describing a GPG public key. | |
152 """ | |
153 | |
154 @property | |
155 @abstractmethod | |
156 def fingerprint(self) -> str: | |
157 """ | |
158 @return: The OpenPGP v4 fingerprint string of this public key. | |
159 """ | |
160 | |
161 | |
162 class GPGSecretKey(ABC): | |
163 """ | |
164 Interface descibing a GPG secret key. | |
165 """ | |
166 | |
167 @property | |
168 @abstractmethod | |
169 def public_key(self) -> GPGPublicKey: | |
170 """ | |
171 @return: The public key corresponding to this secret key. | |
172 """ | |
173 | |
174 | |
175 class GPGProvider(ABC): | |
176 """ | |
177 Interface describing a GPG provider, i.e. a library or framework providing GPG | |
178 encryption, signing and key management. | |
179 | |
180 All methods may raise :class:`GPGProviderError` in addition to those exception types | |
181 listed explicitly. | |
182 | |
183 # TODO: Check keys for revoked, disabled and expired everywhere and exclude those (?) | |
184 """ | |
185 | |
186 @abstractmethod | |
187 def export_public_key(self, public_key: GPGPublicKey) -> bytes: | |
188 """Export a public key in a key material packet according to RFC 4880 §5.5. | |
189 | |
190 Do not use OpenPGP's ASCII Armor. | |
191 | |
192 @param public_key: The public key to export. | |
193 @return: The packet containing the exported public key. | |
194 @raise UnknownKey: if the public key is not available. | |
195 """ | |
196 | |
197 @abstractmethod | |
198 def import_public_key(self, packet: bytes) -> GPGPublicKey: | |
199 """Import a public key from a key material packet according to RFC 4880 §5.5. | |
200 | |
201 OpenPGP's ASCII Armor is not used. | |
202 | |
203 @param packet: A packet containing an exported public key. | |
204 @return: The public key imported from the packet. | |
205 @raise InvalidPacket: if the packet is either syntactically or semantically deemed | |
206 invalid. | |
207 | |
208 @warning: Only packets of version 4 or higher may be accepted, packets below | |
209 version 4 MUST be rejected. | |
210 """ | |
211 | |
212 @abstractmethod | |
213 def backup_secret_key(self, secret_key: GPGSecretKey) -> bytes: | |
214 """Export a secret key for transfer according to RFC 4880 §11.1. | |
215 | |
216 Do not encrypt the secret data, i.e. set the octet indicating string-to-key usage | |
217 conventions to zero in the corresponding secret-key packet according to RFC 4880 | |
218 §5.5.3. Do not use OpenPGP's ASCII Armor. | |
219 | |
220 @param secret_key: The secret key to export. | |
221 @return: The binary blob containing the exported secret key. | |
222 @raise UnknownKey: if the secret key is not available. | |
223 """ | |
224 | |
225 @abstractmethod | |
226 def restore_secret_keys(self, data: bytes) -> Set[GPGSecretKey]: | |
227 """Restore secret keys exported for transfer according to RFC 4880 §11.1. | |
228 | |
229 The secret data is not encrypted, i.e. the octet indicating string-to-key usage | |
230 conventions in the corresponding secret-key packets according to RFC 4880 §5.5.3 | |
231 are set to zero. OpenPGP's ASCII Armor is not used. | |
232 | |
233 @param data: Concatenation of one or more secret keys exported for transfer. | |
234 @return: The secret keys imported from the data. | |
235 @raise InvalidPacket: if the data or one of the packets included in the data is | |
236 either syntactically or semantically deemed invalid. | |
237 | |
238 @warning: Only packets of version 4 or higher may be accepted, packets below | |
239 version 4 MUST be rejected. | |
240 """ | |
241 | |
242 @abstractmethod | |
243 def encrypt_symmetrically(self, plaintext: bytes, password: str) -> bytes: | |
244 """Encrypt data symmetrically according to RFC 4880 §5.3. | |
245 | |
246 The password is used to build a Symmetric-Key Encrypted Session Key packet which | |
247 precedes the Symmetrically Encrypted Data packet that holds the encrypted data. | |
248 | |
249 @param plaintext: The data to encrypt. | |
250 @param password: The password to encrypt the data with. | |
251 @return: The encrypted data. | |
252 """ | |
253 | |
254 @abstractmethod | |
255 def decrypt_symmetrically(self, ciphertext: bytes, password: str) -> bytes: | |
256 """Decrypt data symmetrically according to RFC 4880 §5.3. | |
257 | |
258 The ciphertext consists of a Symmetrically Encrypted Data packet that holds the | |
259 encrypted data, preceded by a Symmetric-Key Encrypted Session Key packet using the | |
260 password. | |
261 | |
262 @param ciphertext: The ciphertext. | |
263 @param password: The password to decrypt the data with. | |
264 @return: The plaintext. | |
265 @raise DecryptionFailed: on decryption failure. | |
266 """ | |
267 | |
268 @abstractmethod | |
269 def sign(self, data: bytes, secret_keys: Set[GPGSecretKey]) -> bytes: | |
270 """Sign some data. | |
271 | |
272 OpenPGP's ASCII Armor is not used. | |
273 | |
274 @param data: The data to sign. | |
275 @param secret_keys: The secret keys to sign the data with. | |
276 @return: The OpenPGP message carrying the signed data. | |
277 """ | |
278 | |
279 @abstractmethod | |
280 def sign_detached(self, data: bytes, secret_keys: Set[GPGSecretKey]) -> bytes: | |
281 """Sign some data. Create the signature detached from the data. | |
282 | |
283 OpenPGP's ASCII Armor is not used. | |
284 | |
285 @param data: The data to sign. | |
286 @param secret_keys: The secret keys to sign the data with. | |
287 @return: The OpenPGP message carrying the detached signature. | |
288 """ | |
289 | |
290 @abstractmethod | |
291 def verify(self, signed_data: bytes, public_keys: Set[GPGPublicKey]) -> bytes: | |
292 """Verify signed data. | |
293 | |
294 OpenPGP's ASCII Armor is not used. | |
295 | |
296 @param signed_data: The signed data as an OpenPGP message. | |
297 @param public_keys: The public keys to verify the signature with. | |
298 @return: The verified and unpacked data. | |
299 @raise VerificationFailed: if the data could not be verified. | |
300 | |
301 @warning: For implementors: it has to be confirmed that a valid signature by one | |
302 of the public keys is available. | |
303 """ | |
304 | |
305 @abstractmethod | |
306 def verify_detached( | |
307 self, | |
308 data: bytes, | |
309 signature: bytes, | |
310 public_keys: Set[GPGPublicKey] | |
311 ) -> None: | |
312 """Verify signed data, where the signature was created detached from the data. | |
313 | |
314 OpenPGP's ASCII Armor is not used. | |
315 | |
316 @param data: The data. | |
317 @param signature: The signature as an OpenPGP message. | |
318 @param public_keys: The public keys to verify the signature with. | |
319 @raise VerificationFailed: if the data could not be verified. | |
320 | |
321 @warning: For implementors: it has to be confirmed that a valid signature by one | |
322 of the public keys is available. | |
323 """ | |
324 | |
325 @abstractmethod | |
326 def encrypt( | |
327 self, | |
328 plaintext: bytes, | |
329 public_keys: Set[GPGPublicKey], | |
330 signing_keys: Optional[Set[GPGSecretKey]] = None | |
331 ) -> bytes: | |
332 """Encrypt and optionally sign some data. | |
333 | |
334 OpenPGP's ASCII Armor is not used. | |
335 | |
336 @param plaintext: The data to encrypt and optionally sign. | |
337 @param public_keys: The public keys to encrypt the data for. | |
338 @param signing_keys: The secret keys to sign the data with. | |
339 @return: The OpenPGP message carrying the encrypted and optionally signed data. | |
340 """ | |
341 | |
342 @abstractmethod | |
343 def decrypt( | |
344 self, | |
345 ciphertext: bytes, | |
346 secret_keys: Set[GPGSecretKey], | |
347 public_keys: Optional[Set[GPGPublicKey]] = None | |
348 ) -> bytes: | |
349 """Decrypt and optionally verify some data. | |
350 | |
351 OpenPGP's ASCII Armor is not used. | |
352 | |
353 @param ciphertext: The encrypted and optionally signed data as an OpenPGP message. | |
354 @param secret_keys: The secret keys to attempt decryption with. | |
355 @param public_keys: The public keys to verify the optional signature with. | |
356 @return: The decrypted, optionally verified and unpacked data. | |
357 @raise DecryptionFailed: on decryption failure. | |
358 @raise VerificationFailed: if the data could not be verified. | |
359 | |
360 @warning: For implementors: it has to be confirmed that the data was decrypted | |
361 using one of the secret keys and that a valid signature by one of the public | |
362 keys is available in case the data is signed. | |
363 """ | |
364 | |
365 @abstractmethod | |
366 def list_public_keys(self, user_id: str) -> Set[GPGPublicKey]: | |
367 """List public keys. | |
368 | |
369 @param user_id: The user id. | |
370 @return: The set of public keys available for this user id. | |
371 """ | |
372 | |
373 @abstractmethod | |
374 def list_secret_keys(self, user_id: str) -> Set[GPGSecretKey]: | |
375 """List secret keys. | |
376 | |
377 @param user_id: The user id. | |
378 @return: The set of secret keys available for this user id. | |
379 """ | |
380 | |
381 @abstractmethod | |
382 def can_sign(self, public_key: GPGPublicKey) -> bool: | |
383 """ | |
384 @return: Whether the public key belongs to a key pair capable of signing. | |
385 """ | |
386 | |
387 @abstractmethod | |
388 def can_encrypt(self, public_key: GPGPublicKey) -> bool: | |
389 """ | |
390 @return: Whether the public key belongs to a key pair capable of encryption. | |
391 """ | |
392 | |
393 @abstractmethod | |
394 def create_key(self, user_id: str) -> GPGSecretKey: | |
395 """Create a new GPG key, capable of signing and encryption. | |
396 | |
397 The key is generated without password protection and without expiration. If a key | |
398 with the same user id already exists, a new key is created anyway. | |
399 | |
400 @param user_id: The user id to assign to the new key. | |
401 @return: The new key. | |
402 """ | |
403 | |
404 | |
405 class GPGME_GPGPublicKey(GPGPublicKey): | |
406 """ | |
407 GPG public key implementation based on GnuPG Made Easy (GPGME). | |
408 """ | |
409 | |
410 def __init__(self, key_obj: Any) -> None: | |
411 """ | |
412 @param key_obj: The GPGME key object. | |
413 """ | |
414 | |
415 self.__key_obj = key_obj | |
416 | |
417 @property | |
418 def fingerprint(self) -> str: | |
419 return self.__key_obj.fpr | |
420 | |
421 @property | |
422 def key_obj(self) -> Any: | |
423 return self.__key_obj | |
424 | |
425 | |
426 class GPGME_GPGSecretKey(GPGSecretKey): | |
427 """ | |
428 GPG secret key implementation based on GnuPG Made Easy (GPGME). | |
429 """ | |
430 | |
431 def __init__(self, public_key: GPGME_GPGPublicKey) -> None: | |
432 """ | |
433 @param public_key: The public key corresponding to this secret key. | |
434 """ | |
435 | |
436 self.__public_key = public_key | |
437 | |
438 @property | |
439 def public_key(self) -> GPGME_GPGPublicKey: | |
440 return self.__public_key | |
441 | |
442 | |
443 class GPGME_GPGProvider(GPGProvider): | |
444 """ | |
445 GPG provider implementation based on GnuPG Made Easy (GPGME). | |
446 """ | |
447 | |
448 def __init__(self, home_dir: Optional[str] = None) -> None: | |
449 """ | |
450 @param home_dir: Optional GPG home directory path to use for all operations. | |
451 """ | |
452 | |
453 self.__home_dir = home_dir | |
454 | |
455 def export_public_key(self, public_key: GPGPublicKey) -> bytes: | |
456 assert isinstance(public_key, GPGME_GPGPublicKey) | |
457 | |
458 pattern = public_key.fingerprint | |
459 | |
460 with gpg.Context(home_dir=self.__home_dir) as c: | |
461 try: | |
462 result = c.key_export_minimal(pattern) | |
463 except gpg.errors.GPGMEError as e: | |
464 raise GPGProviderError("Internal GPGME error") from e | |
465 | |
466 if result is None: | |
467 raise UnknownKey(f"Public key {pattern} not found.") | |
468 | |
469 return result | |
470 | |
471 def import_public_key(self, packet: bytes) -> GPGPublicKey: | |
472 # TODO | |
473 # - Reject packets older than version 4 | |
474 # - Check whether it's actually a public key (through packet inspection?) | |
475 | |
476 with gpg.Context(home_dir=self.__home_dir) as c: | |
477 try: | |
478 result = c.key_import(packet) | |
479 except gpg.errors.GPGMEError as e: | |
480 # From looking at the code, `key_import` never raises. The documentation | |
481 # says it does though, so this is included for future-proofness. | |
482 raise GPGProviderError("Internal GPGME error") from e | |
483 | |
484 if not hasattr(result, "considered"): | |
485 raise InvalidPacket( | |
486 f"Data not considered for public key import: {result}" | |
487 ) | |
488 | |
489 if len(result.imports) != 1: | |
490 raise InvalidPacket( | |
491 "Public key packet does not contain exactly one public key (not" | |
492 " counting subkeys)." | |
493 ) | |
494 | |
495 try: | |
496 key_obj = c.get_key(result.imports[0].fpr, secret=False) | |
497 except gpg.errors.GPGMEError as e: | |
498 raise GPGProviderError("Internal GPGME error") from e | |
499 except gpg.errors.KeyError as e: | |
500 raise GPGProviderError("Newly imported public key not found") from e | |
501 | |
502 return GPGME_GPGPublicKey(key_obj) | |
503 | |
504 def backup_secret_key(self, secret_key: GPGSecretKey) -> bytes: | |
505 assert isinstance(secret_key, GPGME_GPGSecretKey) | |
506 # TODO | |
507 # - Handle password protection/pinentry | |
508 # - Make sure the key is exported unencrypted | |
509 | |
510 pattern = secret_key.public_key.fingerprint | |
511 | |
512 with gpg.Context(home_dir=self.__home_dir) as c: | |
513 try: | |
514 result = c.key_export_secret(pattern) | |
515 except gpg.errors.GPGMEError as e: | |
516 raise GPGProviderError("Internal GPGME error") from e | |
517 | |
518 if result is None: | |
519 raise UnknownKey(f"Secret key {pattern} not found.") | |
520 | |
521 return result | |
522 | |
523 def restore_secret_keys(self, data: bytes) -> Set[GPGSecretKey]: | |
524 # TODO | |
525 # - Reject packets older than version 4 | |
526 # - Check whether it's actually secret keys (through packet inspection?) | |
527 | |
528 with gpg.Context(home_dir=self.__home_dir) as c: | |
529 try: | |
530 result = c.key_import(data) | |
531 except gpg.errors.GPGMEError as e: | |
532 # From looking at the code, `key_import` never raises. The documentation | |
533 # says it does though, so this is included for future-proofness. | |
534 raise GPGProviderError("Internal GPGME error") from e | |
535 | |
536 if not hasattr(result, "considered"): | |
537 raise InvalidPacket( | |
538 f"Data not considered for secret key import: {result}" | |
539 ) | |
540 | |
541 if len(result.imports) == 0: | |
542 raise InvalidPacket("Secret key packet does not contain a secret key.") | |
543 | |
544 secret_keys = set() | |
545 for import_status in result.imports: | |
546 try: | |
547 key_obj = c.get_key(import_status.fpr, secret=True) | |
548 except gpg.errors.GPGMEError as e: | |
549 raise GPGProviderError("Internal GPGME error") from e | |
550 except gpg.errors.KeyError as e: | |
551 raise GPGProviderError("Newly imported secret key not found") from e | |
552 | |
553 secret_keys.add(GPGME_GPGSecretKey(GPGME_GPGPublicKey(key_obj))) | |
554 | |
555 return secret_keys | |
556 | |
557 def encrypt_symmetrically(self, plaintext: bytes, password: str) -> bytes: | |
558 with gpg.Context(home_dir=self.__home_dir) as c: | |
559 try: | |
560 ciphertext, __, __ = c.encrypt(plaintext, passphrase=password, sign=False) | |
561 except gpg.errors.GPGMEError as e: | |
562 raise GPGProviderError("Internal GPGME error") from e | |
563 | |
564 return ciphertext | |
565 | |
566 def decrypt_symmetrically(self, ciphertext: bytes, password: str) -> bytes: | |
567 with gpg.Context(home_dir=self.__home_dir) as c: | |
568 try: | |
569 plaintext, __, __ = c.decrypt( | |
570 ciphertext, | |
571 passphrase=password, | |
572 verify=False | |
573 ) | |
574 except gpg.errors.GPGMEError as e: | |
575 # TODO: Find out what kind of error is raised if the password is wrong and | |
576 # re-raise it as DecryptionFailed instead. | |
577 raise GPGProviderError("Internal GPGME error") from e | |
578 except gpg.UnsupportedAlgorithm as e: | |
579 raise DecryptionFailed("Unsupported algorithm") from e | |
580 | |
581 return plaintext | |
582 | |
583 def sign(self, data: bytes, secret_keys: Set[GPGSecretKey]) -> bytes: | |
584 signers = [] | |
585 for secret_key in secret_keys: | |
586 assert isinstance(secret_key, GPGME_GPGSecretKey) | |
587 | |
588 signers.append(secret_key.public_key.key_obj) | |
589 | |
590 with gpg.Context(home_dir=self.__home_dir, signers=signers) as c: | |
591 try: | |
592 signed_data, __ = c.sign(data) | |
593 except gpg.error.GPGMEError as e: | |
594 raise GPGProviderError("Internal GPGME error") from e | |
595 except gpg.errors.InvalidSigners as e: | |
596 raise GPGProviderError( | |
597 "At least one of the secret keys is invalid for signing" | |
598 ) from e | |
599 | |
600 return signed_data | |
601 | |
602 def sign_detached(self, data: bytes, secret_keys: Set[GPGSecretKey]) -> bytes: | |
603 signers = [] | |
604 for secret_key in secret_keys: | |
605 assert isinstance(secret_key, GPGME_GPGSecretKey) | |
606 | |
607 signers.append(secret_key.public_key.key_obj) | |
608 | |
609 with gpg.Context(home_dir=self.__home_dir, signers=signers) as c: | |
610 try: | |
611 signature, __ = c.sign(data, mode=gpg.constants.sig.mode.DETACH) | |
612 except gpg.error.GPGMEError as e: | |
613 raise GPGProviderError("Internal GPGME error") from e | |
614 except gpg.errors.InvalidSigners as e: | |
615 raise GPGProviderError( | |
616 "At least one of the secret keys is invalid for signing" | |
617 ) from e | |
618 | |
619 return signature | |
620 | |
621 def verify(self, signed_data: bytes, public_keys: Set[GPGPublicKey]) -> bytes: | |
622 with gpg.Context(home_dir=self.__home_dir) as c: | |
623 try: | |
624 data, result = c.verify(signed_data) | |
625 except gpg.errors.GPGMEError as e: | |
626 raise GPGProviderError("Internal GPGME error") from e | |
627 except gpg.errors.BadSignatures as e: | |
628 raise VerificationFailed("Bad signatures on signed data") from e | |
629 | |
630 valid_signature_found = False | |
631 for public_key in public_keys: | |
632 assert isinstance(public_key, GPGME_GPGPublicKey) | |
633 | |
634 for subkey in public_key.key_obj.subkeys: | |
635 for sig in result.signatures: | |
636 if subkey.can_sign and subkey.fpr == sig.fpr: | |
637 valid_signature_found = True | |
638 | |
639 if not valid_signature_found: | |
640 raise VerificationFailed( | |
641 "Data not signed by one of the expected public keys" | |
642 ) | |
643 | |
644 return data | |
645 | |
646 def verify_detached( | |
647 self, | |
648 data: bytes, | |
649 signature: bytes, | |
650 public_keys: Set[GPGPublicKey] | |
651 ) -> None: | |
652 with gpg.Context(home_dir=self.__home_dir) as c: | |
653 try: | |
654 __, result = c.verify(data, signature=signature) | |
655 except gpg.errors.GPGMEError as e: | |
656 raise GPGProviderError("Internal GPGME error") from e | |
657 except gpg.errors.BadSignatures as e: | |
658 raise VerificationFailed("Bad signatures on signed data") from e | |
659 | |
660 valid_signature_found = False | |
661 for public_key in public_keys: | |
662 assert isinstance(public_key, GPGME_GPGPublicKey) | |
663 | |
664 for subkey in public_key.key_obj.subkeys: | |
665 for sig in result.signatures: | |
666 if subkey.can_sign and subkey.fpr == sig.fpr: | |
667 valid_signature_found = True | |
668 | |
669 if not valid_signature_found: | |
670 raise VerificationFailed( | |
671 "Data not signed by one of the expected public keys" | |
672 ) | |
673 | |
674 def encrypt( | |
675 self, | |
676 plaintext: bytes, | |
677 public_keys: Set[GPGPublicKey], | |
678 signing_keys: Optional[Set[GPGSecretKey]] = None | |
679 ) -> bytes: | |
680 recipients = [] | |
681 for public_key in public_keys: | |
682 assert isinstance(public_key, GPGME_GPGPublicKey) | |
683 | |
684 recipients.append(public_key.key_obj) | |
685 | |
686 signers = [] | |
687 if signing_keys is not None: | |
688 for secret_key in signing_keys: | |
689 assert isinstance(secret_key, GPGME_GPGSecretKey) | |
690 | |
691 signers.append(secret_key.public_key.key_obj) | |
692 | |
693 sign = signing_keys is not None | |
694 | |
695 with gpg.Context(home_dir=self.__home_dir, signers=signers) as c: | |
696 try: | |
697 ciphertext, __, __ = c.encrypt( | |
698 plaintext, | |
699 recipients=recipients, | |
700 sign=sign, | |
701 always_trust=True, | |
702 add_encrypt_to=True | |
703 ) | |
704 except gpg.errors.GPGMEError as e: | |
705 raise GPGProviderError("Internal GPGME error") from e | |
706 except gpg.errors.InvalidRecipients as e: | |
707 raise GPGProviderError( | |
708 "At least one of the public keys is invalid for encryption" | |
709 ) from e | |
710 except gpg.errors.InvalidSigners as e: | |
711 raise GPGProviderError( | |
712 "At least one of the signing keys is invalid for signing" | |
713 ) from e | |
714 | |
715 return ciphertext | |
716 | |
717 def decrypt( | |
718 self, | |
719 ciphertext: bytes, | |
720 secret_keys: Set[GPGSecretKey], | |
721 public_keys: Optional[Set[GPGPublicKey]] = None | |
722 ) -> bytes: | |
723 verify = public_keys is not None | |
724 | |
725 with gpg.Context(home_dir=self.__home_dir) as c: | |
726 try: | |
727 plaintext, result, verify_result = c.decrypt( | |
728 ciphertext, | |
729 verify=verify | |
730 ) | |
731 except gpg.errors.GPGMEError as e: | |
732 raise GPGProviderError("Internal GPGME error") from e | |
733 except gpg.UnsupportedAlgorithm as e: | |
734 raise DecryptionFailed("Unsupported algorithm") from e | |
735 | |
736 # TODO: Check whether the data was decrypted using one of the expected secret | |
737 # keys | |
738 | |
739 if public_keys is not None: | |
740 valid_signature_found = False | |
741 for public_key in public_keys: | |
742 assert isinstance(public_key, GPGME_GPGPublicKey) | |
743 | |
744 for subkey in public_key.key_obj.subkeys: | |
745 for sig in verify_result.signatures: | |
746 if subkey.can_sign and subkey.fpr == sig.fpr: | |
747 valid_signature_found = True | |
748 | |
749 if not valid_signature_found: | |
750 raise VerificationFailed( | |
751 "Data not signed by one of the expected public keys" | |
752 ) | |
753 | |
754 return plaintext | |
755 | |
756 def list_public_keys(self, user_id: str) -> Set[GPGPublicKey]: | |
757 with gpg.Context(home_dir=self.__home_dir) as c: | |
758 try: | |
759 return { | |
760 GPGME_GPGPublicKey(key) | |
761 for key | |
762 in c.keylist(pattern=user_id, secret=False) | |
763 } | |
764 except gpg.errors.GPGMEError as e: | |
765 raise GPGProviderError("Internal GPGME error") from e | |
766 | |
767 def list_secret_keys(self, user_id: str) -> Set[GPGSecretKey]: | |
768 with gpg.Context(home_dir=self.__home_dir) as c: | |
769 try: | |
770 return { | |
771 GPGME_GPGSecretKey(GPGME_GPGPublicKey(key)) | |
772 for key | |
773 in c.keylist(pattern=user_id, secret=True) | |
774 } | |
775 except gpg.errors.GPGMEError as e: | |
776 raise GPGProviderError("Internal GPGME error") from e | |
777 | |
778 def can_sign(self, public_key: GPGPublicKey) -> bool: | |
779 assert isinstance(public_key, GPGME_GPGPublicKey) | |
780 | |
781 return any(subkey.can_sign for subkey in public_key.key_obj.subkeys) | |
782 | |
783 def can_encrypt(self, public_key: GPGPublicKey) -> bool: | |
784 assert isinstance(public_key, GPGME_GPGPublicKey) | |
785 | |
786 return any(subkey.can_encrypt for subkey in public_key.key_obj.subkeys) | |
787 | |
788 def create_key(self, user_id: str) -> GPGSecretKey: | |
789 with gpg.Context(home_dir=self.__home_dir) as c: | |
790 try: | |
791 result = c.create_key( | |
792 user_id, | |
793 expires=False, | |
794 sign=True, | |
795 encrypt=True, | |
796 certify=False, | |
797 authenticate=False, | |
798 force=True | |
799 ) | |
800 | |
801 key_obj = c.get_key(result.fpr, secret=True) | |
802 except gpg.errors.GPGMEError as e: | |
803 raise GPGProviderError("Internal GPGME error") from e | |
804 except gpg.errors.KeyError as e: | |
805 raise GPGProviderError("Newly created key not found") from e | |
806 | |
807 return GPGME_GPGSecretKey(GPGME_GPGPublicKey(key_obj)) | |
808 | |
809 | |
810 class PublicKeyMetadata(NamedTuple): | |
811 """ | |
812 Metadata about a published public key. | |
813 """ | |
814 | |
815 fingerprint: str | |
816 timestamp: datetime | |
817 | |
818 | |
819 @enum.unique | |
820 class TrustLevel(enum.Enum): | |
821 """ | |
822 The trust levels required for BTBV and manual trust. | |
823 """ | |
824 | |
825 TRUSTED: str = "TRUSTED" | |
826 BLINDLY_TRUSTED: str = "BLINDLY_TRUSTED" | |
827 UNDECIDED: str = "UNDECIDED" | |
828 DISTRUSTED: str = "DISTRUSTED" | |
829 | |
830 | |
831 OPENPGP_SCHEMA = xmlschema.XMLSchema("""<?xml version="1.0" encoding="utf8"?> | |
832 <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" | |
833 targetNamespace="urn:xmpp:openpgp:0" | |
834 xmlns="urn:xmpp:openpgp:0"> | |
835 | |
836 <xs:element name="openpgp" type="xs:base64Binary"/> | |
837 </xs:schema> | |
838 """) | |
839 | |
840 | |
841 # The following schema needs verion 1.1 of XML Schema, which is not supported by lxml. | |
842 # Luckily, xmlschema exists, which is a clean, well maintained, cross-platform | |
843 # implementation of XML Schema, including version 1.1. | |
844 CONTENT_SCHEMA = xmlschema.XMLSchema11("""<?xml version="1.1" encoding="utf8"?> | |
845 <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" | |
846 targetNamespace="urn:xmpp:openpgp:0" | |
847 xmlns="urn:xmpp:openpgp:0"> | |
848 | |
849 <xs:element name="signcrypt"> | |
850 <xs:complexType> | |
851 <xs:all> | |
852 <xs:element ref="to" maxOccurs="unbounded"/> | |
853 <xs:element ref="time"/> | |
854 <xs:element ref="rpad" minOccurs="0"/> | |
855 <xs:element ref="payload"/> | |
856 </xs:all> | |
857 </xs:complexType> | |
858 </xs:element> | |
859 | |
860 <xs:element name="sign"> | |
861 <xs:complexType> | |
862 <xs:all> | |
863 <xs:element ref="to" maxOccurs="unbounded"/> | |
864 <xs:element ref="time"/> | |
865 <xs:element ref="rpad" minOccurs="0"/> | |
866 <xs:element ref="payload"/> | |
867 </xs:all> | |
868 </xs:complexType> | |
869 </xs:element> | |
870 | |
871 <xs:element name="crypt"> | |
872 <xs:complexType> | |
873 <xs:all> | |
874 <xs:element ref="to" minOccurs="0" maxOccurs="unbounded"/> | |
875 <xs:element ref="time"/> | |
876 <xs:element ref="rpad" minOccurs="0"/> | |
877 <xs:element ref="payload"/> | |
878 </xs:all> | |
879 </xs:complexType> | |
880 </xs:element> | |
881 | |
882 <xs:element name="to"> | |
883 <xs:complexType> | |
884 <xs:attribute name="jid" type="xs:string"/> | |
885 </xs:complexType> | |
886 </xs:element> | |
887 | |
888 <xs:element name="time"> | |
889 <xs:complexType> | |
890 <xs:attribute name="stamp" type="xs:dateTime"/> | |
891 </xs:complexType> | |
892 </xs:element> | |
893 | |
894 <xs:element name="rpad" type="xs:string"/> | |
895 | |
896 <xs:element name="payload"> | |
897 <xs:complexType> | |
898 <xs:sequence> | |
899 <xs:any minOccurs="0" maxOccurs="unbounded" processContents="skip"/> | |
900 </xs:sequence> | |
901 </xs:complexType> | |
902 </xs:element> | |
903 </xs:schema> | |
904 """) | |
905 | |
906 | |
907 PUBLIC_KEYS_LIST_NODE = "urn:xmpp:openpgp:0:public-keys" | |
908 PUBLIC_KEYS_LIST_SCHEMA = xmlschema.XMLSchema("""<?xml version="1.0" encoding="utf8"?> | |
909 <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" | |
910 targetNamespace="urn:xmpp:openpgp:0" | |
911 xmlns="urn:xmpp:openpgp:0"> | |
912 | |
913 <xs:element name="public-keys-list"> | |
914 <xs:complexType> | |
915 <xs:sequence> | |
916 <xs:element ref="pubkey-metadata" minOccurs="0" maxOccurs="unbounded"/> | |
917 </xs:sequence> | |
918 </xs:complexType> | |
919 </xs:element> | |
920 | |
921 <xs:element name="pubkey-metadata"> | |
922 <xs:complexType> | |
923 <xs:attribute name="v4-fingerprint" type="xs:string"/> | |
924 <xs:attribute name="date" type="xs:dateTime"/> | |
925 </xs:complexType> | |
926 </xs:element> | |
927 </xs:schema> | |
928 """) | |
929 | |
930 | |
931 PUBKEY_SCHEMA = xmlschema.XMLSchema("""<?xml version="1.0" encoding="utf8"?> | |
932 <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" | |
933 targetNamespace="urn:xmpp:openpgp:0" | |
934 xmlns="urn:xmpp:openpgp:0"> | |
935 | |
936 <xs:element name="pubkey"> | |
937 <xs:complexType> | |
938 <xs:all> | |
939 <xs:element ref="data"/> | |
940 </xs:all> | |
941 <xs:anyAttribute processContents="skip"/> | |
942 </xs:complexType> | |
943 </xs:element> | |
944 | |
945 <xs:element name="data" type="xs:base64Binary"/> | |
946 </xs:schema> | |
947 """) | |
948 | |
949 | |
950 SECRETKEY_SCHEMA = xmlschema.XMLSchema("""<?xml version="1.0" encoding="utf8"?> | |
951 <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" | |
952 targetNamespace="urn:xmpp:openpgp:0" | |
953 xmlns="urn:xmpp:openpgp:0"> | |
954 | |
955 <xs:element name="secretkey" type="xs:base64Binary"/> | |
956 </xs:schema> | |
957 """) | |
958 | |
959 | |
960 DEFAULT_TRUST_MODEL_PARAM = f""" | |
961 <params> | |
962 <individual> | |
963 <category name="{PARAM_CATEGORY}" label={quoteattr(D_('Security'))}> | |
964 <param name="{PARAM_NAME}" | |
965 label={quoteattr(D_('OMEMO default trust policy'))} | |
966 type="list" security="3"> | |
967 <option value="manual" label={quoteattr(D_('Manual trust (more secure)'))} /> | |
968 <option value="btbv" | |
969 label={quoteattr(D_('Blind Trust Before Verification (more user friendly)'))} | |
970 selected="true" /> | |
971 </param> | |
972 </category> | |
973 </individual> | |
974 </params> | |
975 """ | |
976 | |
977 | |
978 def get_gpg_provider(sat: SAT, client: SatXMPPClient) -> GPGProvider: | |
979 """Get the GPG provider for a client. | |
980 | |
981 @param sat: The SAT instance. | |
982 @param client: The client. | |
983 @return: The GPG provider specifically for that client. | |
984 """ | |
985 | |
986 return GPGME_GPGProvider(str(sat.get_local_path(client, "gnupg-home"))) | |
987 | |
988 | |
989 def generate_passphrase() -> str: | |
990 """Generate a secure passphrase for symmetric encryption. | |
991 | |
992 @return: The passphrase. | |
993 """ | |
994 | |
995 return "-".join("".join( | |
996 secrets.choice("123456789ABCDEFGHIJKLMNPQRSTUVWXYZ") for __ in range(4) | |
997 ) for __ in range(6)) | |
998 | |
999 | |
1000 # TODO: Handle the user id mess | |
1001 class XEP_0373: | |
1002 """ | |
1003 Implementation of XEP-0373: OpenPGP for XMPP under namespace ``urn:xmpp:openpgp:0``. | |
1004 """ | |
1005 | |
1006 def __init__(self, sat: SAT) -> None: | |
1007 """ | |
1008 @param sat: The SAT instance. | |
1009 """ | |
1010 | |
1011 self.__sat = sat | |
1012 | |
1013 # Add configuration option to choose between manual trust and BTBV as the trust | |
1014 # model | |
1015 sat.memory.updateParams(DEFAULT_TRUST_MODEL_PARAM) | |
1016 | |
1017 self.__xep_0045 = cast(Optional[XEP_0045], sat.plugins.get("XEP-0045")) | |
1018 self.__xep_0060 = cast(XEP_0060, sat.plugins["XEP-0060"]) | |
1019 | |
1020 self.__storage: Dict[str, persistent.LazyPersistentBinaryDict] = {} | |
1021 | |
1022 xep_0163 = cast(XEP_0163, sat.plugins["XEP-0163"]) | |
1023 xep_0163.addPEPEvent( | |
1024 "OX_PUBLIC_KEYS_LIST", | |
1025 PUBLIC_KEYS_LIST_NODE, | |
1026 lambda items_event, profile: defer.ensureDeferred( | |
1027 self.__on_public_keys_list_update(items_event, profile) | |
1028 ) | |
1029 ) | |
1030 | |
1031 async def profileConnected( # pylint: disable=invalid-name | |
1032 self, | |
1033 client: SatXMPPClient | |
1034 ) -> None: | |
1035 """ | |
1036 @param client: The client. | |
1037 """ | |
1038 | |
1039 profile = cast(str, client.profile) | |
1040 | |
1041 if not profile in self.__storage: | |
1042 self.__storage[profile] = \ | |
1043 persistent.LazyPersistentBinaryDict("XEP-0373", client.profile) | |
1044 | |
1045 if len(self.list_secret_keys(client)) == 0: | |
1046 log.debug(f"Generating first GPG key for {client.jid.userhost()}.") | |
1047 await self.create_key(client) | |
1048 | |
1049 async def __on_public_keys_list_update( | |
1050 self, | |
1051 items_event: pubsub.ItemsEvent, | |
1052 profile: str | |
1053 ) -> None: | |
1054 """Handle public keys list updates fired by PEP. | |
1055 | |
1056 @param items_event: The event. | |
1057 @param profile: The profile this event belongs to. | |
1058 """ | |
1059 | |
1060 client = self.__sat.getClient(profile) | |
1061 | |
1062 sender = cast(jid.JID, items_event.sender) | |
1063 items = cast(List[domish.Element], items_event.items) | |
1064 | |
1065 if len(items) > 1: | |
1066 log.warning("Ignoring public keys list update with more than one element.") | |
1067 return | |
1068 | |
1069 item_elt = next(iter(items), None) | |
1070 if item_elt is None: | |
1071 log.debug("Ignoring empty public keys list update.") | |
1072 return | |
1073 | |
1074 public_keys_list_elt = cast( | |
1075 Optional[domish.Element], | |
1076 next(item_elt.elements(NS_OX, "public-keys-list"), None) | |
1077 ) | |
1078 | |
1079 pubkey_metadata_elts: Optional[List[domish.Element]] = None | |
1080 | |
1081 if public_keys_list_elt is not None: | |
1082 try: | |
1083 PUBLIC_KEYS_LIST_SCHEMA.validate(public_keys_list_elt.toXml()) | |
1084 except xmlschema.XMLSchemaValidationError: | |
1085 pass | |
1086 else: | |
1087 pubkey_metadata_elts = \ | |
1088 list(public_keys_list_elt.elements(NS_OX, "pubkey-metadata")) | |
1089 | |
1090 if pubkey_metadata_elts is None: | |
1091 log.warning(f"Malformed public keys list update item: {item_elt.toXml()}") | |
1092 return | |
1093 | |
1094 new_public_keys_metadata = { PublicKeyMetadata( | |
1095 fingerprint=cast(str, pubkey_metadata_elt["v4-fingerprint"]), | |
1096 timestamp=parse_datetime(cast(str, pubkey_metadata_elt["date"])) | |
1097 ) for pubkey_metadata_elt in pubkey_metadata_elts } | |
1098 | |
1099 storage_key = f"/public-keys-metadata/{sender.userhost()}" | |
1100 | |
1101 local_public_keys_metadata = cast( | |
1102 Set[PublicKeyMetadata], | |
1103 await self.__storage[profile].get(storage_key, set()) | |
1104 ) | |
1105 | |
1106 unchanged_keys = new_public_keys_metadata & local_public_keys_metadata | |
1107 changed_or_new_keys = new_public_keys_metadata - unchanged_keys | |
1108 available_keys = self.list_public_keys(client, sender) | |
1109 | |
1110 for key_metadata in changed_or_new_keys: | |
1111 # Check whether the changed or new key has been imported before | |
1112 if any(key.fingerprint == key_metadata.fingerprint for key in available_keys): | |
1113 try: | |
1114 # If it has been imported before, try to update it | |
1115 await self.import_public_key(client, sender, key_metadata.fingerprint) | |
1116 except Exception as e: | |
1117 log.warning(f"Public key import failed: {e}") | |
1118 | |
1119 # If the update fails, remove the key from the local metadata list | |
1120 # such that the update is attempted again next time | |
1121 new_public_keys_metadata.remove(key_metadata) | |
1122 | |
1123 # Check whether this update was for our account and make sure all of our keys are | |
1124 # included in the update | |
1125 if sender.userhost() == client.jid.userhost(): | |
1126 secret_keys = self.list_secret_keys(client) | |
1127 missing_keys = set(filter(lambda secret_key: all( | |
1128 key_metadata.fingerprint != secret_key.public_key.fingerprint | |
1129 for key_metadata | |
1130 in new_public_keys_metadata | |
1131 ), secret_keys)) | |
1132 | |
1133 if len(missing_keys) > 0: | |
1134 log.warning( | |
1135 "Public keys list update did not contain at least one of our keys." | |
1136 f" {new_public_keys_metadata}" | |
1137 ) | |
1138 | |
1139 for missing_key in missing_keys: | |
1140 log.warning(missing_key.public_key.fingerprint) | |
1141 new_public_keys_metadata.add(PublicKeyMetadata( | |
1142 fingerprint=missing_key.public_key.fingerprint, | |
1143 timestamp=datetime.now(timezone.utc) | |
1144 )) | |
1145 | |
1146 await self.publish_public_keys_list(client, new_public_keys_metadata) | |
1147 | |
1148 await self.__storage[profile].force(storage_key, new_public_keys_metadata) | |
1149 | |
1150 def list_public_keys(self, client: SatXMPPClient, jid: jid.JID) -> Set[GPGPublicKey]: | |
1151 """List GPG public keys available for a JID. | |
1152 | |
1153 @param client: The client to perform this operation with. | |
1154 @param jid: The JID. Can be a bare JID. | |
1155 @return: The set of public keys available for this JID. | |
1156 """ | |
1157 | |
1158 gpg_provider = get_gpg_provider(self.__sat, client) | |
1159 | |
1160 return gpg_provider.list_public_keys(f"xmpp:{jid.userhost()}") | |
1161 | |
1162 def list_secret_keys(self, client: SatXMPPClient) -> Set[GPGSecretKey]: | |
1163 """List GPG secret keys available for a JID. | |
1164 | |
1165 @param client: The client to perform this operation with. | |
1166 @return: The set of secret keys available for this JID. | |
1167 """ | |
1168 | |
1169 gpg_provider = get_gpg_provider(self.__sat, client) | |
1170 | |
1171 return gpg_provider.list_secret_keys(f"xmpp:{client.jid.userhost()}") | |
1172 | |
1173 async def create_key(self, client: SatXMPPClient) -> GPGSecretKey: | |
1174 """Create a new GPG key, capable of signing and encryption. | |
1175 | |
1176 The key is generated without password protection and without expiration. | |
1177 | |
1178 @param client: The client to perform this operation with. | |
1179 @return: The new key. | |
1180 """ | |
1181 | |
1182 gpg_provider = get_gpg_provider(self.__sat, client) | |
1183 | |
1184 secret_key = gpg_provider.create_key(f"xmpp:{client.jid.userhost()}") | |
1185 | |
1186 await self.publish_public_key(client, secret_key.public_key) | |
1187 | |
1188 storage_key = f"/public-keys-metadata/{client.jid.userhost()}" | |
1189 | |
1190 public_keys_list = cast( | |
1191 Set[PublicKeyMetadata], | |
1192 await self.__storage[client.profile].get(storage_key, set()) | |
1193 ) | |
1194 | |
1195 public_keys_list.add(PublicKeyMetadata( | |
1196 fingerprint=secret_key.public_key.fingerprint, | |
1197 timestamp=datetime.now(timezone.utc) | |
1198 )) | |
1199 | |
1200 await self.publish_public_keys_list(client, public_keys_list) | |
1201 | |
1202 await self.__storage[client.profile].force(storage_key, public_keys_list) | |
1203 | |
1204 return secret_key | |
1205 | |
1206 @staticmethod | |
1207 def __build_content_element( | |
1208 element_name: Literal["signcrypt", "sign", "crypt"], | |
1209 recipient_jids: Iterable[jid.JID], | |
1210 include_rpad: bool | |
1211 ) -> Tuple[domish.Element, domish.Element]: | |
1212 """Build a content element. | |
1213 | |
1214 @param element_name: The name of the content element. | |
1215 @param recipient_jids: The intended recipients of this content element. Can be | |
1216 bare JIDs. | |
1217 @param include_rpad: Whether to include random-length random-content padding. | |
1218 @return: The content element and the ``<payload/>`` element to add the stanza | |
1219 extension elements to. | |
1220 """ | |
1221 | |
1222 content_elt = domish.Element((NS_OX, element_name)) | |
1223 | |
1224 for recipient_jid in recipient_jids: | |
1225 content_elt.addElement("to")["jid"] = recipient_jid.userhost() | |
1226 | |
1227 content_elt.addElement("time")["stamp"] = format_datetime() | |
1228 | |
1229 if include_rpad: | |
1230 # XEP-0373 doesn't specify bounds for the length of the random padding. This | |
1231 # uses the bounds specified in XEP-0420 for the closely related rpad affix. | |
1232 rpad_length = secrets.randbelow(201) | |
1233 rpad_content = "".join( | |
1234 secrets.choice(string.digits + string.ascii_letters + string.punctuation) | |
1235 for __ | |
1236 in range(rpad_length) | |
1237 ) | |
1238 content_elt.addElement("rpad", content=rpad_content) | |
1239 | |
1240 payload_elt = content_elt.addElement("payload") | |
1241 | |
1242 return content_elt, payload_elt | |
1243 | |
1244 @staticmethod | |
1245 def build_signcrypt_element( | |
1246 recipient_jids: Iterable[jid.JID] | |
1247 ) -> Tuple[domish.Element, domish.Element]: | |
1248 """Build a ``<signcrypt/>`` content element. | |
1249 | |
1250 @param recipient_jids: The intended recipients of this content element. Can be | |
1251 bare JIDs. | |
1252 @return: The ``<signcrypt/>`` element and the ``<payload/>`` element to add the | |
1253 stanza extension elements to. | |
1254 """ | |
1255 | |
1256 if len(recipient_jids) == 0: | |
1257 raise ValueError("Recipient JIDs must be provided.") | |
1258 | |
1259 return XEP_0373.__build_content_element("signcrypt", recipient_jids, True) | |
1260 | |
1261 @staticmethod | |
1262 def build_sign_element( | |
1263 recipient_jids: Iterable[jid.JID], | |
1264 include_rpad: bool | |
1265 ) -> Tuple[domish.Element, domish.Element]: | |
1266 """Build a ``<sign/>`` content element. | |
1267 | |
1268 @param recipient_jids: The intended recipients of this content element. Can be | |
1269 bare JIDs. | |
1270 @param include_rpad: Whether to include random-length random-content padding, | |
1271 which is OPTIONAL for the ``<sign/>`` content element. | |
1272 @return: The ``<sign/>`` element and the ``<payload/>`` element to add the stanza | |
1273 extension elements to. | |
1274 """ | |
1275 | |
1276 if len(recipient_jids) == 0: | |
1277 raise ValueError("Recipient JIDs must be provided.") | |
1278 | |
1279 return XEP_0373.__build_content_element("sign", recipient_jids, include_rpad) | |
1280 | |
1281 @staticmethod | |
1282 def build_crypt_element( | |
1283 recipient_jids: Iterable[jid.JID] | |
1284 ) -> Tuple[domish.Element, domish.Element]: | |
1285 """Build a ``<crypt/>`` content element. | |
1286 | |
1287 @param recipient_jids: The intended recipients of this content element. Specifying | |
1288 the intended recipients is OPTIONAL for the ``<crypt/>`` content element. Can | |
1289 be bare JIDs. | |
1290 @return: The ``<crypt/>`` element and the ``<payload/>`` element to add the stanza | |
1291 extension elements to. | |
1292 """ | |
1293 | |
1294 return XEP_0373.__build_content_element("crypt", recipient_jids, True) | |
1295 | |
1296 async def build_openpgp_element( | |
1297 self, | |
1298 client: SatXMPPClient, | |
1299 content_elt: domish.Element, | |
1300 recipient_jids: Set[jid.JID] | |
1301 ) -> domish.Element: | |
1302 """Build an ``<openpgp/>`` element. | |
1303 | |
1304 @param client: The client to perform this operation with. | |
1305 @param content_elt: The content element to contain in the ``<openpgp/>`` element. | |
1306 @param recipient_jids: The recipient's JIDs. Can be bare JIDs. | |
1307 @return: The ``<openpgp/>`` element. | |
1308 """ | |
1309 | |
1310 gpg_provider = get_gpg_provider(self.__sat, client) | |
1311 | |
1312 # TODO: I'm not sure whether we want to sign with all keys by default or choose | |
1313 # just one key/a subset of keys to sign with. | |
1314 signing_keys = set(filter( | |
1315 lambda secret_key: gpg_provider.can_sign(secret_key.public_key), | |
1316 self.list_secret_keys(client) | |
1317 )) | |
1318 | |
1319 encryption_keys: Set[GPGPublicKey] = set() | |
1320 | |
1321 for recipient_jid in recipient_jids: | |
1322 # Import all keys of the recipient | |
1323 all_public_keys = await self.import_all_public_keys(client, recipient_jid) | |
1324 | |
1325 # Filter for keys that can encrypt | |
1326 encryption_keys |= set(filter(gpg_provider.can_encrypt, all_public_keys)) | |
1327 | |
1328 # TODO: Handle trust | |
1329 | |
1330 content = content_elt.toXml().encode("utf-8") | |
1331 data: bytes | |
1332 | |
1333 if content_elt.name == "signcrypt": | |
1334 data = gpg_provider.encrypt(content, encryption_keys, signing_keys) | |
1335 elif content_elt.name == "sign": | |
1336 data = gpg_provider.sign(content, signing_keys) | |
1337 elif content_elt.name == "crypt": | |
1338 data = gpg_provider.encrypt(content, encryption_keys) | |
1339 else: | |
1340 raise ValueError(f"Unknown content element <{content_elt.name}/>") | |
1341 | |
1342 openpgp_elt = domish.Element((NS_OX, "openpgp")) | |
1343 openpgp_elt.addContent(base64.b64encode(data).decode("ASCII")) | |
1344 return openpgp_elt | |
1345 | |
1346 async def unpack_openpgp_element( | |
1347 self, | |
1348 client: SatXMPPClient, | |
1349 openpgp_elt: domish.Element, | |
1350 element_name: Literal["signcrypt", "sign", "crypt"], | |
1351 sender_jid: jid.JID | |
1352 ) -> Tuple[domish.Element, datetime]: | |
1353 """Verify, decrypt and unpack an ``<openpgp/>`` element. | |
1354 | |
1355 @param client: The client to perform this operation with. | |
1356 @param openpgp_elt: The ``<openpgp/>`` element. | |
1357 @param element_name: The name of the content element. | |
1358 @param sender_jid: The sender's JID. Can be a bare JID. | |
1359 @return: The ``<payload/>`` element containing the decrypted/verified stanza | |
1360 extension elements carried by this ``<openpgp/>`` element, and the timestamp | |
1361 contained in the content element. | |
1362 @raise exceptions.ParsingError: on syntactical verification errors. | |
1363 @raise VerificationError: on semantical verification errors accoding to XEP-0373. | |
1364 @raise DecryptionFailed: on decryption failure. | |
1365 @raise VerificationFailed: if the data could not be verified. | |
1366 | |
1367 @warning: The timestamp is not verified for plausibility; this SHOULD be done by | |
1368 the calling code. | |
1369 """ | |
1370 | |
1371 gpg_provider = get_gpg_provider(self.__sat, client) | |
1372 | |
1373 decryption_keys = set(filter( | |
1374 lambda secret_key: gpg_provider.can_encrypt(secret_key.public_key), | |
1375 self.list_secret_keys(client) | |
1376 )) | |
1377 | |
1378 # Import all keys of the sender | |
1379 all_public_keys = await self.import_all_public_keys(client, sender_jid) | |
1380 | |
1381 # Filter for keys that can sign | |
1382 verification_keys = set(filter(gpg_provider.can_sign, all_public_keys)) | |
1383 | |
1384 # TODO: Handle trust | |
1385 | |
1386 try: | |
1387 OPENPGP_SCHEMA.validate(openpgp_elt.toXml()) | |
1388 except xmlschema.XMLSchemaValidationError as e: | |
1389 raise exceptions.ParsingError( | |
1390 "<openpgp/> element doesn't pass schema validation." | |
1391 ) from e | |
1392 | |
1393 openpgp_message = base64.b64decode(str(openpgp_elt)) | |
1394 content: bytes | |
1395 | |
1396 if element_name == "signcrypt": | |
1397 content = gpg_provider.decrypt( | |
1398 openpgp_message, | |
1399 decryption_keys, | |
1400 public_keys=verification_keys | |
1401 ) | |
1402 elif element_name == "sign": | |
1403 content = gpg_provider.verify(openpgp_message, verification_keys) | |
1404 elif element_name == "crypt": | |
1405 content = gpg_provider.decrypt(openpgp_message, decryption_keys) | |
1406 else: | |
1407 assert_never(element_name) | |
1408 | |
1409 try: | |
1410 content_elt = cast( | |
1411 domish.Element, | |
1412 xml_tools.ElementParser()(content.decode("utf-8")) | |
1413 ) | |
1414 except UnicodeDecodeError as e: | |
1415 raise exceptions.ParsingError("UTF-8 decoding error") from e | |
1416 | |
1417 try: | |
1418 CONTENT_SCHEMA.validate(content_elt.toXml()) | |
1419 except xmlschema.XMLSchemaValidationError as e: | |
1420 raise exceptions.ParsingError( | |
1421 f"<{element_name}/> element doesn't pass schema validation." | |
1422 ) from e | |
1423 | |
1424 if content_elt.name != element_name: | |
1425 raise exceptions.ParsingError(f"Not a <{element_name}/> element.") | |
1426 | |
1427 recipient_jids = \ | |
1428 { jid.JID(to_elt["jid"]) for to_elt in content_elt.elements(NS_OX, "to") } | |
1429 | |
1430 if ( | |
1431 client.jid.userhostJID() not in { jid.userhostJID() for jid in recipient_jids } | |
1432 and element_name != "crypt" | |
1433 ): | |
1434 raise VerificationError( | |
1435 f"Recipient list in <{element_name}/> element does not list our (bare)" | |
1436 f" JID." | |
1437 ) | |
1438 | |
1439 time_elt = next(content_elt.elements(NS_OX, "time")) | |
1440 | |
1441 timestamp = parse_datetime(time_elt["stamp"]) | |
1442 | |
1443 payload_elt = next(content_elt.elements(NS_OX, "payload")) | |
1444 | |
1445 return payload_elt, timestamp | |
1446 | |
1447 async def publish_public_key( | |
1448 self, | |
1449 client: SatXMPPClient, | |
1450 public_key: GPGPublicKey | |
1451 ) -> None: | |
1452 """Publish a public key. | |
1453 | |
1454 @param client: The client. | |
1455 @param public_key: The public key to publish. | |
1456 @raise XMPPInteractionFailed: if any interaction via XMPP failed. | |
1457 """ | |
1458 | |
1459 gpg_provider = get_gpg_provider(self.__sat, client) | |
1460 | |
1461 packet = gpg_provider.export_public_key(public_key) | |
1462 | |
1463 node = f"urn:xmpp:openpgp:0:public-keys:{public_key.fingerprint}" | |
1464 | |
1465 pubkey_elt = domish.Element((NS_OX, "pubkey")) | |
1466 | |
1467 pubkey_elt.addElement("data", content=base64.b64encode(packet).decode("ASCII")) | |
1468 | |
1469 try: | |
1470 await self.__xep_0060.sendItem( | |
1471 client, | |
1472 client.jid.userhostJID(), | |
1473 node, | |
1474 pubkey_elt, | |
1475 format_datetime(), | |
1476 extra={ | |
1477 XEP_0060.EXTRA_PUBLISH_OPTIONS: { | |
1478 XEP_0060.OPT_PERSIST_ITEMS: "true", | |
1479 XEP_0060.OPT_ACCESS_MODEL: "open", | |
1480 XEP_0060.OPT_MAX_ITEMS: 1 | |
1481 }, | |
1482 # TODO: Do we really want publish_without_options here? | |
1483 XEP_0060.EXTRA_ON_PRECOND_NOT_MET: "publish_without_options" | |
1484 } | |
1485 ) | |
1486 except Exception as e: | |
1487 raise XMPPInteractionFailed("Publishing the public key failed.") from e | |
1488 | |
1489 async def import_all_public_keys( | |
1490 self, | |
1491 client: SatXMPPClient, | |
1492 jid: jid.JID | |
1493 ) -> Set[GPGPublicKey]: | |
1494 """Import all public keys of a JID that have not been imported before. | |
1495 | |
1496 @param client: The client. | |
1497 @param jid: The JID. Can be a bare JID. | |
1498 @return: The public keys. | |
1499 @note: Failure to import a key simply results in the key not being included in the | |
1500 result. | |
1501 """ | |
1502 | |
1503 available_public_keys = self.list_public_keys(client, jid) | |
1504 | |
1505 storage_key = f"/public-keys-metadata/{jid.userhost()}" | |
1506 | |
1507 public_keys_metadata = cast( | |
1508 Set[PublicKeyMetadata], | |
1509 await self.__storage[client.profile].get(storage_key, set()) | |
1510 ) | |
1511 | |
1512 missing_keys = set(filter(lambda public_key_metadata: all( | |
1513 public_key_metadata.fingerprint != public_key.fingerprint | |
1514 for public_key | |
1515 in available_public_keys | |
1516 ), public_keys_metadata)) | |
1517 | |
1518 for missing_key in missing_keys: | |
1519 try: | |
1520 available_public_keys.add( | |
1521 await self.import_public_key(client, jid, missing_key.fingerprint) | |
1522 ) | |
1523 except Exception as e: | |
1524 log.warning( | |
1525 f"Import of public key {missing_key.fingerprint} owned by" | |
1526 f" {jid.userhost()} failed, ignoring: {e}" | |
1527 ) | |
1528 | |
1529 return available_public_keys | |
1530 | |
1531 async def import_public_key( | |
1532 self, | |
1533 client: SatXMPPClient, | |
1534 jid: jid.JID, | |
1535 fingerprint: str | |
1536 ) -> GPGPublicKey: | |
1537 """Import a public key. | |
1538 | |
1539 @param client: The client. | |
1540 @param jid: The JID owning the public key. Can be a bare JID. | |
1541 @param fingerprint: The fingerprint of the public key. | |
1542 @return: The public key. | |
1543 @raise exceptions.NotFound: if the public key was not found. | |
1544 @raise exceptions.ParsingError: on XML-level parsing errors. | |
1545 @raise InvalidPacket: if the packet is either syntactically or semantically deemed | |
1546 invalid. | |
1547 @raise XMPPInteractionFailed: if any interaction via XMPP failed. | |
1548 """ | |
1549 | |
1550 gpg_provider = get_gpg_provider(self.__sat, client) | |
1551 | |
1552 node = f"urn:xmpp:openpgp:0:public-keys:{fingerprint}" | |
1553 | |
1554 try: | |
1555 items, __ = await self.__xep_0060.getItems( | |
1556 client, | |
1557 jid.userhostJID(), | |
1558 node, | |
1559 max_items=1 | |
1560 ) | |
1561 except exceptions.NotFound as e: | |
1562 raise exceptions.NotFound( | |
1563 f"No public key with fingerprint {fingerprint} published by JID" | |
1564 f" {jid.userhost()}." | |
1565 ) from e | |
1566 except Exception as e: | |
1567 raise XMPPInteractionFailed("Fetching the public keys list failed.") from e | |
1568 | |
1569 try: | |
1570 item_elt = cast(domish.Element, items[0]) | |
1571 except IndexError as e: | |
1572 raise exceptions.NotFound( | |
1573 f"No public key with fingerprint {fingerprint} published by JID" | |
1574 f" {jid.userhost()}." | |
1575 ) from e | |
1576 | |
1577 pubkey_elt = cast( | |
1578 Optional[domish.Element], | |
1579 next(item_elt.elements(NS_OX, "pubkey"), None) | |
1580 ) | |
1581 | |
1582 if pubkey_elt is None: | |
1583 raise exceptions.ParsingError( | |
1584 f"Publish-Subscribe item of JID {jid.userhost()} doesn't contain pubkey" | |
1585 f" element." | |
1586 ) | |
1587 | |
1588 try: | |
1589 PUBKEY_SCHEMA.validate(pubkey_elt.toXml()) | |
1590 except xmlschema.XMLSchemaValidationError as e: | |
1591 raise exceptions.ParsingError( | |
1592 f"Publish-Subscribe item of JID {jid.userhost()} doesn't pass pubkey" | |
1593 f" schema validation." | |
1594 ) from e | |
1595 | |
1596 public_key = gpg_provider.import_public_key(base64.b64decode(str( | |
1597 next(pubkey_elt.elements(NS_OX, "data")) | |
1598 ))) | |
1599 | |
1600 return public_key | |
1601 | |
1602 async def publish_public_keys_list( | |
1603 self, | |
1604 client: SatXMPPClient, | |
1605 public_keys_list: Iterable[PublicKeyMetadata] | |
1606 ) -> None: | |
1607 """Publish/update the own public keys list. | |
1608 | |
1609 @param client: The client. | |
1610 @param public_keys_list: The public keys list. | |
1611 @raise XMPPInteractionFailed: if any interaction via XMPP failed. | |
1612 | |
1613 @warning: All public keys referenced in the public keys list MUST be published | |
1614 beforehand. | |
1615 """ | |
1616 | |
1617 if len({ pkm.fingerprint for pkm in public_keys_list }) != len(public_keys_list): | |
1618 raise ValueError("Public keys list contains duplicate fingerprints.") | |
1619 | |
1620 node = "urn:xmpp:openpgp:0:public-keys" | |
1621 | |
1622 public_keys_list_elt = domish.Element((NS_OX, "public-keys-list")) | |
1623 | |
1624 for public_key_metadata in public_keys_list: | |
1625 pubkey_metadata_elt = public_keys_list_elt.addElement("pubkey-metadata") | |
1626 pubkey_metadata_elt["v4-fingerprint"] = public_key_metadata.fingerprint | |
1627 pubkey_metadata_elt["date"] = format_datetime(public_key_metadata.timestamp) | |
1628 | |
1629 try: | |
1630 await self.__xep_0060.sendItem( | |
1631 client, | |
1632 client.jid.userhostJID(), | |
1633 node, | |
1634 public_keys_list_elt, | |
1635 item_id=XEP_0060.ID_SINGLETON, | |
1636 extra={ | |
1637 XEP_0060.EXTRA_PUBLISH_OPTIONS: { | |
1638 XEP_0060.OPT_PERSIST_ITEMS: "true", | |
1639 XEP_0060.OPT_ACCESS_MODEL: "open", | |
1640 XEP_0060.OPT_MAX_ITEMS: 1 | |
1641 }, | |
1642 # TODO: Do we really want publish_without_options here? | |
1643 XEP_0060.EXTRA_ON_PRECOND_NOT_MET: "publish_without_options" | |
1644 } | |
1645 ) | |
1646 except Exception as e: | |
1647 raise XMPPInteractionFailed("Publishing the public keys list failed.") from e | |
1648 | |
1649 async def download_public_keys_list( | |
1650 self, | |
1651 client: SatXMPPClient, | |
1652 jid: jid.JID | |
1653 ) -> Optional[Set[PublicKeyMetadata]]: | |
1654 """Download the public keys list of a JID. | |
1655 | |
1656 @param client: The client. | |
1657 @param jid: The JID. Can be a bare JID. | |
1658 @return: The public keys list or ``None`` if the JID hasn't published a public | |
1659 keys list. An empty list means the JID has published an empty list. | |
1660 @raise exceptions.ParsingError: on XML-level parsing errors. | |
1661 @raise XMPPInteractionFailed: if any interaction via XMPP failed. | |
1662 """ | |
1663 | |
1664 node = "urn:xmpp:openpgp:0:public-keys" | |
1665 | |
1666 try: | |
1667 items, __ = await self.__xep_0060.getItems( | |
1668 client, | |
1669 jid.userhostJID(), | |
1670 node, | |
1671 max_items=1 | |
1672 ) | |
1673 except exceptions.NotFound: | |
1674 return None | |
1675 except Exception as e: | |
1676 raise XMPPInteractionFailed() from e | |
1677 | |
1678 try: | |
1679 item_elt = cast(domish.Element, items[0]) | |
1680 except IndexError: | |
1681 return None | |
1682 | |
1683 public_keys_list_elt = cast( | |
1684 Optional[domish.Element], | |
1685 next(item_elt.elements(NS_OX, "public-keys-list"), None) | |
1686 ) | |
1687 | |
1688 if public_keys_list_elt is None: | |
1689 return None | |
1690 | |
1691 try: | |
1692 PUBLIC_KEYS_LIST_SCHEMA.validate(public_keys_list_elt.toXml()) | |
1693 except xmlschema.XMLSchemaValidationError as e: | |
1694 raise exceptions.ParsingError( | |
1695 f"Publish-Subscribe item of JID {jid.userhost()} doesn't pass public keys" | |
1696 f" list schema validation." | |
1697 ) from e | |
1698 | |
1699 return { | |
1700 PublicKeyMetadata( | |
1701 fingerprint=pubkey_metadata_elt["v4-fingerprint"], | |
1702 timestamp=parse_datetime(pubkey_metadata_elt["date"]) | |
1703 ) | |
1704 for pubkey_metadata_elt | |
1705 in public_keys_list_elt.elements(NS_OX, "pubkey-metadata") | |
1706 } | |
1707 | |
1708 async def __prepare_secret_key_synchronization( | |
1709 self, | |
1710 client: SatXMPPClient | |
1711 ) -> Optional[domish.Element]: | |
1712 """Prepare for secret key synchronization. | |
1713 | |
1714 Makes sure the relative protocols and protocol extensions are supported by the | |
1715 server and makes sure that the PEP node for secret synchronization exists and is | |
1716 configured correctly. The node is created if necessary. | |
1717 | |
1718 @param client: The client. | |
1719 @return: As part of the preparations, the secret key synchronization PEP node is | |
1720 fetched. The result of that fetch is returned here. | |
1721 @raise exceptions.FeatureNotFound: if the server lacks support for the required | |
1722 protocols or protocol extensions. | |
1723 @raise XMPPInteractionFailed: if any interaction via XMPP failed. | |
1724 """ | |
1725 | |
1726 try: | |
1727 infos = cast(DiscoInfo, await self.__sat.memory.disco.getInfos( | |
1728 client, | |
1729 client.jid.userhostJID() | |
1730 )) | |
1731 except Exception as e: | |
1732 raise XMPPInteractionFailed( | |
1733 "Error performing service discovery on the own bare JID." | |
1734 ) from e | |
1735 | |
1736 identities = cast(Dict[Tuple[str, str], str], infos.identities) | |
1737 features = cast(Set[DiscoFeature], infos.features) | |
1738 | |
1739 if ("pubsub", "pep") not in identities: | |
1740 raise exceptions.FeatureNotFound("Server doesn't support PEP.") | |
1741 | |
1742 if "http://jabber.org/protocol/pubsub#access-whitelist" not in features: | |
1743 raise exceptions.FeatureNotFound( | |
1744 "Server doesn't support the whitelist access model." | |
1745 ) | |
1746 | |
1747 persistent_items_supported = \ | |
1748 "http://jabber.org/protocol/pubsub#persistent-items" in features | |
1749 | |
1750 # TODO: persistent-items is a SHOULD, how do we handle the feature missing? | |
1751 | |
1752 node = "urn:xmpp:openpgp:0:secret-key" | |
1753 | |
1754 try: | |
1755 items, __ = await self.__xep_0060.getItems( | |
1756 client, | |
1757 client.jid.userhostJID(), | |
1758 node, | |
1759 max_items=1 | |
1760 ) | |
1761 except exceptions.NotFound: | |
1762 try: | |
1763 await self.__xep_0060.createNode( | |
1764 client, | |
1765 client.jid.userhostJID(), | |
1766 node, | |
1767 { | |
1768 XEP_0060.OPT_PERSIST_ITEMS: "true", | |
1769 XEP_0060.OPT_ACCESS_MODEL: "whitelist", | |
1770 XEP_0060.OPT_MAX_ITEMS: "1" | |
1771 } | |
1772 ) | |
1773 except Exception as e: | |
1774 raise XMPPInteractionFailed( | |
1775 "Error creating the secret key synchronization node." | |
1776 ) from e | |
1777 except Exception as e: | |
1778 raise XMPPInteractionFailed( | |
1779 "Error fetching the secret key synchronization node." | |
1780 ) from e | |
1781 | |
1782 try: | |
1783 return cast(domish.Element, items[0]) | |
1784 except IndexError: | |
1785 return None | |
1786 | |
1787 async def export_secret_keys( | |
1788 self, | |
1789 client: SatXMPPClient, | |
1790 secret_keys: Iterable[GPGSecretKey] | |
1791 ) -> str: | |
1792 """Export secret keys to synchronize them with other devices. | |
1793 | |
1794 @param client: The client. | |
1795 @param secret_keys: The secret keys to export. | |
1796 @return: The backup code needed to decrypt the exported secret keys. | |
1797 @raise exceptions.FeatureNotFound: if the server lacks support for the required | |
1798 protocols or protocol extensions. | |
1799 @raise XMPPInteractionFailed: if any interaction via XMPP failed. | |
1800 """ | |
1801 | |
1802 gpg_provider = get_gpg_provider(self.__sat, client) | |
1803 | |
1804 await self.__prepare_secret_key_synchronization(client) | |
1805 | |
1806 backup_code = generate_passphrase() | |
1807 | |
1808 plaintext = b"".join( | |
1809 gpg_provider.backup_secret_key(secret_key) for secret_key in secret_keys | |
1810 ) | |
1811 | |
1812 ciphertext = gpg_provider.encrypt_symmetrically(plaintext, backup_code) | |
1813 | |
1814 node = "urn:xmpp:openpgp:0:secret-key" | |
1815 | |
1816 secretkey_elt = domish.Element((NS_OX, "secretkey")) | |
1817 secretkey_elt.addContent(base64.b64encode(ciphertext).decode("ASCII")) | |
1818 | |
1819 try: | |
1820 await self.__xep_0060.sendItem( | |
1821 client, | |
1822 client.jid.userhostJID(), | |
1823 node, | |
1824 secretkey_elt | |
1825 ) | |
1826 except Exception as e: | |
1827 raise XMPPInteractionFailed("Publishing the secret keys failed.") from e | |
1828 | |
1829 return backup_code | |
1830 | |
1831 async def download_secret_keys(self, client: SatXMPPClient) -> Optional[bytes]: | |
1832 """Download previously exported secret keys to import them in a second step. | |
1833 | |
1834 The downloading and importing steps are separate since a backup code is required | |
1835 for the import and it should be possible to try multiple backup codes without | |
1836 redownloading the data every time. The second half of the import procedure is | |
1837 provided by :meth:`import_secret_keys`. | |
1838 | |
1839 @param client: The client. | |
1840 @return: The encrypted secret keys previously exported, if any. | |
1841 @raise exceptions.FeatureNotFound: if the server lacks support for the required | |
1842 protocols or protocol extensions. | |
1843 @raise exceptions.ParsingError: on XML-level parsing errors. | |
1844 @raise XMPPInteractionFailed: if any interaction via XMPP failed. | |
1845 """ | |
1846 | |
1847 item_elt = await self.__prepare_secret_key_synchronization(client) | |
1848 if item_elt is None: | |
1849 return None | |
1850 | |
1851 secretkey_elt = cast( | |
1852 Optional[domish.Element], | |
1853 next(item_elt.elements(NS_OX, "secretkey"), None) | |
1854 ) | |
1855 | |
1856 if secretkey_elt is None: | |
1857 return None | |
1858 | |
1859 try: | |
1860 SECRETKEY_SCHEMA.validate(secretkey_elt.toXml()) | |
1861 except xmlschema.XMLSchemaValidationError as e: | |
1862 raise exceptions.ParsingError( | |
1863 "Publish-Subscribe item doesn't pass secretkey schema validation." | |
1864 ) from e | |
1865 | |
1866 return base64.b64decode(str(secretkey_elt)) | |
1867 | |
1868 def import_secret_keys( | |
1869 self, | |
1870 client: SatXMPPClient, | |
1871 ciphertext: bytes, | |
1872 backup_code: str | |
1873 ) -> Set[GPGSecretKey]: | |
1874 """Import previously downloaded secret keys. | |
1875 | |
1876 The downloading and importing steps are separate since a backup code is required | |
1877 for the import and it should be possible to try multiple backup codes without | |
1878 redownloading the data every time. The first half of the import procedure is | |
1879 provided by :meth:`download_secret_keys`. | |
1880 | |
1881 @param client: The client to perform this operation with. | |
1882 @param ciphertext: The ciphertext, i.e. the data returned by | |
1883 :meth:`download_secret_keys`. | |
1884 @param backup_code: The backup code needed to decrypt the data. | |
1885 @raise InvalidPacket: if one of the GPG packets building the secret key data is | |
1886 either syntactically or semantically deemed invalid. | |
1887 @raise DecryptionFailed: on decryption failure. | |
1888 """ | |
1889 | |
1890 gpg_provider = get_gpg_provider(self.__sat, client) | |
1891 | |
1892 return gpg_provider.restore_secret_keys(gpg_provider.decrypt_symmetrically( | |
1893 ciphertext, | |
1894 backup_code | |
1895 )) | |
1896 | |
1897 @staticmethod | |
1898 def __get_joined_muc_users( | |
1899 client: SatXMPPClient, | |
1900 xep_0045: XEP_0045, | |
1901 room_jid: jid.JID | |
1902 ) -> Set[jid.JID]: | |
1903 """ | |
1904 @param client: The client. | |
1905 @param xep_0045: A MUC plugin instance. | |
1906 @param room_jid: The room JID. | |
1907 @return: A set containing the bare JIDs of the MUC participants. | |
1908 @raise InternalError: if the MUC is not joined or the entity information of a | |
1909 participant isn't available. | |
1910 """ | |
1911 # TODO: This should probably be a global helper somewhere | |
1912 | |
1913 bare_jids: Set[jid.JID] = set() | |
1914 | |
1915 try: | |
1916 room = cast(muc.Room, xep_0045.getRoom(client, room_jid)) | |
1917 except exceptions.NotFound as e: | |
1918 raise exceptions.InternalError( | |
1919 "Participant list of unjoined MUC requested." | |
1920 ) from e | |
1921 | |
1922 for user in cast(Dict[str, muc.User], room.roster).values(): | |
1923 entity = cast(Optional[SatXMPPEntity], user.entity) | |
1924 if entity is None: | |
1925 raise exceptions.InternalError( | |
1926 f"Participant list of MUC requested, but the entity information of" | |
1927 f" the participant {user} is not available." | |
1928 ) | |
1929 | |
1930 bare_jids.add(entity.jid.userhostJID()) | |
1931 | |
1932 return bare_jids | |
1933 | |
1934 async def get_trust( | |
1935 self, | |
1936 client: SatXMPPClient, | |
1937 public_key: GPGPublicKey, | |
1938 owner: jid.JID | |
1939 ) -> TrustLevel: | |
1940 """Query the trust level of a public key. | |
1941 | |
1942 @param client: The client to perform this operation under. | |
1943 @param public_key: The public key. | |
1944 @param owner: The owner of the public key. Can be a bare JID. | |
1945 @return: The trust level. | |
1946 """ | |
1947 | |
1948 key = f"/trust/{owner.userhost()}/{public_key.fingerprint}" | |
1949 | |
1950 try: | |
1951 return TrustLevel(await self.__storage[client.profile][key]) | |
1952 except KeyError: | |
1953 return TrustLevel.UNDECIDED | |
1954 | |
1955 async def set_trust( | |
1956 self, | |
1957 client: SatXMPPClient, | |
1958 public_key: GPGPublicKey, | |
1959 owner: jid.JID, | |
1960 trust_level: TrustLevel | |
1961 ) -> None: | |
1962 """Set the trust level of a public key. | |
1963 | |
1964 @param client: The client to perform this operation under. | |
1965 @param public_key: The public key. | |
1966 @param owner: The owner of the public key. Can be a bare JID. | |
1967 @param trust_leve: The trust level. | |
1968 """ | |
1969 | |
1970 key = f"/trust/{owner.userhost()}/{public_key.fingerprint}" | |
1971 | |
1972 await self.__storage[client.profile].force(key, trust_level.name) | |
1973 | |
1974 async def getTrustUI( # pylint: disable=invalid-name | |
1975 self, | |
1976 client: SatXMPPClient, | |
1977 entity: jid.JID | |
1978 ) -> xml_tools.XMLUI: | |
1979 """ | |
1980 @param client: The client. | |
1981 @param entity: The entity whose device trust levels to manage. | |
1982 @return: An XMLUI instance which opens a form to manage the trust level of all | |
1983 devices belonging to the entity. | |
1984 """ | |
1985 | |
1986 if entity.resource: | |
1987 raise ValueError("A bare JID is expected.") | |
1988 | |
1989 bare_jids: Set[jid.JID] | |
1990 if self.__xep_0045 is not None and self.__xep_0045.isJoinedRoom(client, entity): | |
1991 bare_jids = self.__get_joined_muc_users(client, self.__xep_0045, entity) | |
1992 else: | |
1993 bare_jids = { entity.userhostJID() } | |
1994 | |
1995 all_public_keys = list({ | |
1996 bare_jid: list(self.list_public_keys(client, bare_jid)) | |
1997 for bare_jid | |
1998 in bare_jids | |
1999 }.items()) | |
2000 | |
2001 async def callback( | |
2002 data: Any, | |
2003 profile: str # pylint: disable=unused-argument | |
2004 ) -> Dict[Never, Never]: | |
2005 """ | |
2006 @param data: The XMLUI result produces by the trust UI form. | |
2007 @param profile: The profile. | |
2008 @return: An empty dictionary. The type of the return value was chosen | |
2009 conservatively since the exact options are neither known not needed here. | |
2010 """ | |
2011 | |
2012 if C.bool(data.get("cancelled", "false")): | |
2013 return {} | |
2014 | |
2015 data_form_result = cast( | |
2016 Dict[str, str], | |
2017 xml_tools.XMLUIResult2DataFormResult(data) | |
2018 ) | |
2019 for key, value in data_form_result.items(): | |
2020 if not key.startswith("trust_"): | |
2021 continue | |
2022 | |
2023 outer_index, inner_index = key.split("_")[1:] | |
2024 | |
2025 owner, public_keys = all_public_keys[int(outer_index)] | |
2026 public_key = public_keys[int(inner_index)] | |
2027 trust = TrustLevel(value) | |
2028 | |
2029 if (await self.get_trust(client, public_key, owner)) is not trust: | |
2030 await self.set_trust(client, public_key, owner, value) | |
2031 | |
2032 return {} | |
2033 | |
2034 submit_id = self.__sat.registerCallback(callback, with_data=True, one_shot=True) | |
2035 | |
2036 result = xml_tools.XMLUI( | |
2037 panel_type=C.XMLUI_FORM, | |
2038 title=D_("OX trust management"), | |
2039 submit_id=submit_id | |
2040 ) | |
2041 # Casting this to Any, otherwise all calls on the variable cause type errors | |
2042 # pylint: disable=no-member | |
2043 trust_ui = cast(Any, result) | |
2044 trust_ui.addText(D_( | |
2045 "This is OX trusting system. You'll see below the GPG keys of your " | |
2046 "contacts, and a list selection to trust them or not. A trusted key " | |
2047 "can read your messages in plain text, so be sure to only validate " | |
2048 "keys that you are sure are belonging to your contact. It's better " | |
2049 "to do this when you are next to your contact, so " | |
2050 "you can check the \"fingerprint\" of the key " | |
2051 "yourself. Do *not* validate a key if the fingerprint is wrong!" | |
2052 )) | |
2053 | |
2054 own_secret_keys = self.list_secret_keys(client) | |
2055 | |
2056 trust_ui.changeContainer("label") | |
2057 for index, secret_key in enumerate(own_secret_keys): | |
2058 trust_ui.addLabel(D_(f"Own secret key {index} fingerprint")) | |
2059 trust_ui.addText(secret_key.public_key.fingerprint) | |
2060 trust_ui.addEmpty() | |
2061 trust_ui.addEmpty() | |
2062 | |
2063 for outer_index, [ owner, public_keys ] in enumerate(all_public_keys): | |
2064 for inner_index, public_key in enumerate(public_keys): | |
2065 trust_ui.addLabel(D_("Contact")) | |
2066 trust_ui.addJid(jid.JID(owner)) | |
2067 trust_ui.addLabel(D_("Fingerprint")) | |
2068 trust_ui.addText(public_key.fingerprint) | |
2069 trust_ui.addLabel(D_("Trust this device?")) | |
2070 | |
2071 current_trust_level = await self.get_trust(client, public_key, owner) | |
2072 avaiable_trust_levels = \ | |
2073 { TrustLevel.DISTRUSTED, TrustLevel.TRUSTED, current_trust_level } | |
2074 | |
2075 trust_ui.addList( | |
2076 f"trust_{outer_index}_{inner_index}", | |
2077 options=[ trust_level.name for trust_level in avaiable_trust_levels ], | |
2078 selected=current_trust_level.name, | |
2079 styles=[ "inline" ] | |
2080 ) | |
2081 | |
2082 trust_ui.addEmpty() | |
2083 trust_ui.addEmpty() | |
2084 | |
2085 return result |