Mercurial > libervia-backend
comparison libervia/backend/plugins/plugin_xep_0373.py @ 4270:0d7bb4df2343
Reformatted code base using black.
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 19 Jun 2024 18:44:57 +0200 |
parents | b53b6dc1f929 |
children | 71c939e34ca6 |
comparison
equal
deleted
inserted
replaced
4269:64a85ce8be70 | 4270:0d7bb4df2343 |
---|---|
73 "GPGPublicKey", | 73 "GPGPublicKey", |
74 "GPGSecretKey", | 74 "GPGSecretKey", |
75 "GPGProvider", | 75 "GPGProvider", |
76 "PublicKeyMetadata", | 76 "PublicKeyMetadata", |
77 "gpg_provider", | 77 "gpg_provider", |
78 "TrustLevel" | 78 "TrustLevel", |
79 ] | 79 ] |
80 | 80 |
81 | 81 |
82 log = cast(Logger, getLogger(__name__)) # type: ignore[no-untyped-call] | 82 log = cast(Logger, getLogger(__name__)) # type: ignore[no-untyped-call] |
83 | 83 |
84 | 84 |
85 PLUGIN_INFO = { | 85 PLUGIN_INFO = { |
86 C.PI_NAME: "XEP-0373", | 86 C.PI_NAME: "XEP-0373", |
87 C.PI_IMPORT_NAME: "XEP-0373", | 87 C.PI_IMPORT_NAME: "XEP-0373", |
88 C.PI_TYPE: "SEC", | 88 C.PI_TYPE: "SEC", |
89 C.PI_PROTOCOLS: [ "XEP-0373" ], | 89 C.PI_PROTOCOLS: ["XEP-0373"], |
90 C.PI_DEPENDENCIES: [ "XEP-0060", "XEP-0163" ], | 90 C.PI_DEPENDENCIES: ["XEP-0060", "XEP-0163"], |
91 C.PI_RECOMMENDATIONS: [], | 91 C.PI_RECOMMENDATIONS: [], |
92 C.PI_MAIN: "XEP_0373", | 92 C.PI_MAIN: "XEP_0373", |
93 C.PI_HANDLER: "no", | 93 C.PI_HANDLER: "no", |
94 C.PI_DESCRIPTION: D_("Implementation of OpenPGP for XMPP"), | 94 C.PI_DESCRIPTION: D_("Implementation of OpenPGP for XMPP"), |
95 } | 95 } |
304 of the public keys is available. | 304 of the public keys is available. |
305 """ | 305 """ |
306 | 306 |
307 @abstractmethod | 307 @abstractmethod |
308 def verify_detached( | 308 def verify_detached( |
309 self, | 309 self, data: bytes, signature: bytes, public_keys: Set[GPGPublicKey] |
310 data: bytes, | |
311 signature: bytes, | |
312 public_keys: Set[GPGPublicKey] | |
313 ) -> None: | 310 ) -> None: |
314 """Verify signed data, where the signature was created detached from the data. | 311 """Verify signed data, where the signature was created detached from the data. |
315 | 312 |
316 OpenPGP's ASCII Armor is not used. | 313 OpenPGP's ASCII Armor is not used. |
317 | 314 |
327 @abstractmethod | 324 @abstractmethod |
328 def encrypt( | 325 def encrypt( |
329 self, | 326 self, |
330 plaintext: bytes, | 327 plaintext: bytes, |
331 public_keys: Set[GPGPublicKey], | 328 public_keys: Set[GPGPublicKey], |
332 signing_keys: Optional[Set[GPGSecretKey]] = None | 329 signing_keys: Optional[Set[GPGSecretKey]] = None, |
333 ) -> bytes: | 330 ) -> bytes: |
334 """Encrypt and optionally sign some data. | 331 """Encrypt and optionally sign some data. |
335 | 332 |
336 OpenPGP's ASCII Armor is not used. | 333 OpenPGP's ASCII Armor is not used. |
337 | 334 |
344 @abstractmethod | 341 @abstractmethod |
345 def decrypt( | 342 def decrypt( |
346 self, | 343 self, |
347 ciphertext: bytes, | 344 ciphertext: bytes, |
348 secret_keys: Set[GPGSecretKey], | 345 secret_keys: Set[GPGSecretKey], |
349 public_keys: Optional[Set[GPGPublicKey]] = None | 346 public_keys: Optional[Set[GPGPublicKey]] = None, |
350 ) -> bytes: | 347 ) -> bytes: |
351 """Decrypt and optionally verify some data. | 348 """Decrypt and optionally verify some data. |
352 | 349 |
353 OpenPGP's ASCII Armor is not used. | 350 OpenPGP's ASCII Armor is not used. |
354 | 351 |
567 | 564 |
568 def decrypt_symmetrically(self, ciphertext: bytes, password: str) -> bytes: | 565 def decrypt_symmetrically(self, ciphertext: bytes, password: str) -> bytes: |
569 with gpg.Context(home_dir=self.__home_dir) as c: | 566 with gpg.Context(home_dir=self.__home_dir) as c: |
570 try: | 567 try: |
571 plaintext, __, __ = c.decrypt( | 568 plaintext, __, __ = c.decrypt( |
572 ciphertext, | 569 ciphertext, passphrase=password, verify=False |
573 passphrase=password, | |
574 verify=False | |
575 ) | 570 ) |
576 except gpg.errors.GPGMEError as e: | 571 except gpg.errors.GPGMEError as e: |
577 # TODO: Find out what kind of error is raised if the password is wrong and | 572 # TODO: Find out what kind of error is raised if the password is wrong and |
578 # re-raise it as DecryptionFailed instead. | 573 # re-raise it as DecryptionFailed instead. |
579 raise GPGProviderError("Internal GPGME error") from e | 574 raise GPGProviderError("Internal GPGME error") from e |
644 ) | 639 ) |
645 | 640 |
646 return data | 641 return data |
647 | 642 |
648 def verify_detached( | 643 def verify_detached( |
649 self, | 644 self, data: bytes, signature: bytes, public_keys: Set[GPGPublicKey] |
650 data: bytes, | |
651 signature: bytes, | |
652 public_keys: Set[GPGPublicKey] | |
653 ) -> None: | 645 ) -> None: |
654 with gpg.Context(home_dir=self.__home_dir) as c: | 646 with gpg.Context(home_dir=self.__home_dir) as c: |
655 try: | 647 try: |
656 __, result = c.verify(data, signature=signature) | 648 __, result = c.verify(data, signature=signature) |
657 except gpg.errors.GPGMEError as e: | 649 except gpg.errors.GPGMEError as e: |
675 | 667 |
676 def encrypt( | 668 def encrypt( |
677 self, | 669 self, |
678 plaintext: bytes, | 670 plaintext: bytes, |
679 public_keys: Set[GPGPublicKey], | 671 public_keys: Set[GPGPublicKey], |
680 signing_keys: Optional[Set[GPGSecretKey]] = None | 672 signing_keys: Optional[Set[GPGSecretKey]] = None, |
681 ) -> bytes: | 673 ) -> bytes: |
682 recipients = [] | 674 recipients = [] |
683 for public_key in public_keys: | 675 for public_key in public_keys: |
684 assert isinstance(public_key, GPGME_GPGPublicKey) | 676 assert isinstance(public_key, GPGME_GPGPublicKey) |
685 | 677 |
699 ciphertext, __, __ = c.encrypt( | 691 ciphertext, __, __ = c.encrypt( |
700 plaintext, | 692 plaintext, |
701 recipients=recipients, | 693 recipients=recipients, |
702 sign=sign, | 694 sign=sign, |
703 always_trust=True, | 695 always_trust=True, |
704 add_encrypt_to=True | 696 add_encrypt_to=True, |
705 ) | 697 ) |
706 except gpg.errors.GPGMEError as e: | 698 except gpg.errors.GPGMEError as e: |
707 raise GPGProviderError("Internal GPGME error") from e | 699 raise GPGProviderError("Internal GPGME error") from e |
708 except gpg.errors.InvalidRecipients as e: | 700 except gpg.errors.InvalidRecipients as e: |
709 raise GPGProviderError( | 701 raise GPGProviderError( |
718 | 710 |
719 def decrypt( | 711 def decrypt( |
720 self, | 712 self, |
721 ciphertext: bytes, | 713 ciphertext: bytes, |
722 secret_keys: Set[GPGSecretKey], | 714 secret_keys: Set[GPGSecretKey], |
723 public_keys: Optional[Set[GPGPublicKey]] = None | 715 public_keys: Optional[Set[GPGPublicKey]] = None, |
724 ) -> bytes: | 716 ) -> bytes: |
725 verify = public_keys is not None | 717 verify = public_keys is not None |
726 | 718 |
727 with gpg.Context(home_dir=self.__home_dir) as c: | 719 with gpg.Context(home_dir=self.__home_dir) as c: |
728 try: | 720 try: |
729 plaintext, result, verify_result = c.decrypt( | 721 plaintext, result, verify_result = c.decrypt(ciphertext, verify=verify) |
730 ciphertext, | |
731 verify=verify | |
732 ) | |
733 except gpg.errors.GPGMEError as e: | 722 except gpg.errors.GPGMEError as e: |
734 raise GPGProviderError("Internal GPGME error") from e | 723 raise GPGProviderError("Internal GPGME error") from e |
735 except gpg.UnsupportedAlgorithm as e: | 724 except gpg.UnsupportedAlgorithm as e: |
736 raise DecryptionFailed("Unsupported algorithm") from e | 725 raise DecryptionFailed("Unsupported algorithm") from e |
737 | 726 |
758 def list_public_keys(self, user_id: str) -> Set[GPGPublicKey]: | 747 def list_public_keys(self, user_id: str) -> Set[GPGPublicKey]: |
759 with gpg.Context(home_dir=self.__home_dir) as c: | 748 with gpg.Context(home_dir=self.__home_dir) as c: |
760 try: | 749 try: |
761 return { | 750 return { |
762 GPGME_GPGPublicKey(key) | 751 GPGME_GPGPublicKey(key) |
763 for key | 752 for key in c.keylist(pattern=user_id, secret=False) |
764 in c.keylist(pattern=user_id, secret=False) | |
765 } | 753 } |
766 except gpg.errors.GPGMEError as e: | 754 except gpg.errors.GPGMEError as e: |
767 raise GPGProviderError("Internal GPGME error") from e | 755 raise GPGProviderError("Internal GPGME error") from e |
768 | 756 |
769 def list_secret_keys(self, user_id: str) -> Set[GPGSecretKey]: | 757 def list_secret_keys(self, user_id: str) -> Set[GPGSecretKey]: |
770 with gpg.Context(home_dir=self.__home_dir) as c: | 758 with gpg.Context(home_dir=self.__home_dir) as c: |
771 try: | 759 try: |
772 return { | 760 return { |
773 GPGME_GPGSecretKey(GPGME_GPGPublicKey(key)) | 761 GPGME_GPGSecretKey(GPGME_GPGPublicKey(key)) |
774 for key | 762 for key in c.keylist(pattern=user_id, secret=True) |
775 in c.keylist(pattern=user_id, secret=True) | |
776 } | 763 } |
777 except gpg.errors.GPGMEError as e: | 764 except gpg.errors.GPGMEError as e: |
778 raise GPGProviderError("Internal GPGME error") from e | 765 raise GPGProviderError("Internal GPGME error") from e |
779 | 766 |
780 def can_sign(self, public_key: GPGPublicKey) -> bool: | 767 def can_sign(self, public_key: GPGPublicKey) -> bool: |
795 expires=False, | 782 expires=False, |
796 sign=True, | 783 sign=True, |
797 encrypt=True, | 784 encrypt=True, |
798 certify=False, | 785 certify=False, |
799 authenticate=False, | 786 authenticate=False, |
800 force=True | 787 force=True, |
801 ) | 788 ) |
802 | 789 |
803 key_obj = c.get_key(result.fpr, secret=True) | 790 key_obj = c.get_key(result.fpr, secret=True) |
804 except gpg.errors.GPGMEError as e: | 791 except gpg.errors.GPGMEError as e: |
805 raise GPGProviderError("Internal GPGME error") from e | 792 raise GPGProviderError("Internal GPGME error") from e |
811 | 798 |
812 class PublicKeyMetadata(NamedTuple): | 799 class PublicKeyMetadata(NamedTuple): |
813 """ | 800 """ |
814 Metadata about a published public key. | 801 Metadata about a published public key. |
815 """ | 802 """ |
803 | |
816 fingerprint: str | 804 fingerprint: str |
817 timestamp: datetime | 805 timestamp: datetime |
818 | 806 |
819 def to_dict(self) -> dict: | 807 def to_dict(self) -> dict: |
820 # Convert the instance to a dictionary and handle datetime serialization | 808 # Convert the instance to a dictionary and handle datetime serialization |
821 data = self._asdict() | 809 data = self._asdict() |
822 data['timestamp'] = self.timestamp.isoformat() | 810 data["timestamp"] = self.timestamp.isoformat() |
823 return data | 811 return data |
824 | 812 |
825 @staticmethod | 813 @staticmethod |
826 def from_dict(data: dict) -> 'PublicKeyMetadata': | 814 def from_dict(data: dict) -> "PublicKeyMetadata": |
827 # Load a serialised dictionary | 815 # Load a serialised dictionary |
828 data['timestamp'] = datetime.fromisoformat(data['timestamp']) | 816 data["timestamp"] = datetime.fromisoformat(data["timestamp"]) |
829 return PublicKeyMetadata(**data) | 817 return PublicKeyMetadata(**data) |
830 | 818 |
831 | 819 |
832 @enum.unique | 820 @enum.unique |
833 class TrustLevel(enum.Enum): | 821 class TrustLevel(enum.Enum): |
839 BLINDLY_TRUSTED: str = "BLINDLY_TRUSTED" | 827 BLINDLY_TRUSTED: str = "BLINDLY_TRUSTED" |
840 UNDECIDED: str = "UNDECIDED" | 828 UNDECIDED: str = "UNDECIDED" |
841 DISTRUSTED: str = "DISTRUSTED" | 829 DISTRUSTED: str = "DISTRUSTED" |
842 | 830 |
843 | 831 |
844 OPENPGP_SCHEMA = xmlschema.XMLSchema("""<?xml version="1.0" encoding="utf8"?> | 832 OPENPGP_SCHEMA = xmlschema.XMLSchema( |
833 """<?xml version="1.0" encoding="utf8"?> | |
845 <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" | 834 <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" |
846 targetNamespace="urn:xmpp:openpgp:0" | 835 targetNamespace="urn:xmpp:openpgp:0" |
847 xmlns="urn:xmpp:openpgp:0"> | 836 xmlns="urn:xmpp:openpgp:0"> |
848 | 837 |
849 <xs:element name="openpgp" type="xs:base64Binary"/> | 838 <xs:element name="openpgp" type="xs:base64Binary"/> |
850 </xs:schema> | 839 </xs:schema> |
851 """) | 840 """ |
841 ) | |
852 | 842 |
853 | 843 |
854 # The following schema needs verion 1.1 of XML Schema, which is not supported by lxml. | 844 # The following schema needs verion 1.1 of XML Schema, which is not supported by lxml. |
855 # Luckily, xmlschema exists, which is a clean, well maintained, cross-platform | 845 # Luckily, xmlschema exists, which is a clean, well maintained, cross-platform |
856 # implementation of XML Schema, including version 1.1. | 846 # implementation of XML Schema, including version 1.1. |
857 CONTENT_SCHEMA = xmlschema.XMLSchema11("""<?xml version="1.1" encoding="utf8"?> | 847 CONTENT_SCHEMA = xmlschema.XMLSchema11( |
848 """<?xml version="1.1" encoding="utf8"?> | |
858 <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" | 849 <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" |
859 targetNamespace="urn:xmpp:openpgp:0" | 850 targetNamespace="urn:xmpp:openpgp:0" |
860 xmlns="urn:xmpp:openpgp:0"> | 851 xmlns="urn:xmpp:openpgp:0"> |
861 | 852 |
862 <xs:element name="signcrypt"> | 853 <xs:element name="signcrypt"> |
912 <xs:any minOccurs="0" maxOccurs="unbounded" processContents="skip"/> | 903 <xs:any minOccurs="0" maxOccurs="unbounded" processContents="skip"/> |
913 </xs:sequence> | 904 </xs:sequence> |
914 </xs:complexType> | 905 </xs:complexType> |
915 </xs:element> | 906 </xs:element> |
916 </xs:schema> | 907 </xs:schema> |
917 """) | 908 """ |
909 ) | |
918 | 910 |
919 | 911 |
920 PUBLIC_KEYS_LIST_NODE = "urn:xmpp:openpgp:0:public-keys" | 912 PUBLIC_KEYS_LIST_NODE = "urn:xmpp:openpgp:0:public-keys" |
921 PUBLIC_KEYS_LIST_SCHEMA = xmlschema.XMLSchema("""<?xml version="1.0" encoding="utf8"?> | 913 PUBLIC_KEYS_LIST_SCHEMA = xmlschema.XMLSchema( |
914 """<?xml version="1.0" encoding="utf8"?> | |
922 <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" | 915 <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" |
923 targetNamespace="urn:xmpp:openpgp:0" | 916 targetNamespace="urn:xmpp:openpgp:0" |
924 xmlns="urn:xmpp:openpgp:0"> | 917 xmlns="urn:xmpp:openpgp:0"> |
925 | 918 |
926 <xs:element name="public-keys-list"> | 919 <xs:element name="public-keys-list"> |
936 <xs:attribute name="v4-fingerprint" type="xs:string"/> | 929 <xs:attribute name="v4-fingerprint" type="xs:string"/> |
937 <xs:attribute name="date" type="xs:dateTime"/> | 930 <xs:attribute name="date" type="xs:dateTime"/> |
938 </xs:complexType> | 931 </xs:complexType> |
939 </xs:element> | 932 </xs:element> |
940 </xs:schema> | 933 </xs:schema> |
941 """) | 934 """ |
942 | 935 ) |
943 | 936 |
944 PUBKEY_SCHEMA = xmlschema.XMLSchema("""<?xml version="1.0" encoding="utf8"?> | 937 |
938 PUBKEY_SCHEMA = xmlschema.XMLSchema( | |
939 """<?xml version="1.0" encoding="utf8"?> | |
945 <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" | 940 <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" |
946 targetNamespace="urn:xmpp:openpgp:0" | 941 targetNamespace="urn:xmpp:openpgp:0" |
947 xmlns="urn:xmpp:openpgp:0"> | 942 xmlns="urn:xmpp:openpgp:0"> |
948 | 943 |
949 <xs:element name="pubkey"> | 944 <xs:element name="pubkey"> |
955 </xs:complexType> | 950 </xs:complexType> |
956 </xs:element> | 951 </xs:element> |
957 | 952 |
958 <xs:element name="data" type="xs:base64Binary"/> | 953 <xs:element name="data" type="xs:base64Binary"/> |
959 </xs:schema> | 954 </xs:schema> |
960 """) | 955 """ |
961 | 956 ) |
962 | 957 |
963 SECRETKEY_SCHEMA = xmlschema.XMLSchema("""<?xml version="1.0" encoding="utf8"?> | 958 |
959 SECRETKEY_SCHEMA = xmlschema.XMLSchema( | |
960 """<?xml version="1.0" encoding="utf8"?> | |
964 <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" | 961 <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" |
965 targetNamespace="urn:xmpp:openpgp:0" | 962 targetNamespace="urn:xmpp:openpgp:0" |
966 xmlns="urn:xmpp:openpgp:0"> | 963 xmlns="urn:xmpp:openpgp:0"> |
967 | 964 |
968 <xs:element name="secretkey" type="xs:base64Binary"/> | 965 <xs:element name="secretkey" type="xs:base64Binary"/> |
969 </xs:schema> | 966 </xs:schema> |
970 """) | 967 """ |
968 ) | |
971 | 969 |
972 | 970 |
973 DEFAULT_TRUST_MODEL_PARAM = f""" | 971 DEFAULT_TRUST_MODEL_PARAM = f""" |
974 <params> | 972 <params> |
975 <individual> | 973 <individual> |
1003 """Generate a secure passphrase for symmetric encryption. | 1001 """Generate a secure passphrase for symmetric encryption. |
1004 | 1002 |
1005 @return: The passphrase. | 1003 @return: The passphrase. |
1006 """ | 1004 """ |
1007 | 1005 |
1008 return "-".join("".join( | 1006 return "-".join( |
1009 secrets.choice("123456789ABCDEFGHIJKLMNPQRSTUVWXYZ") for __ in range(4) | 1007 "".join(secrets.choice("123456789ABCDEFGHIJKLMNPQRSTUVWXYZ") for __ in range(4)) |
1010 ) for __ in range(6)) | 1008 for __ in range(6) |
1009 ) | |
1011 | 1010 |
1012 | 1011 |
1013 # TODO: Handle the user id mess | 1012 # TODO: Handle the user id mess |
1014 class XEP_0373: | 1013 class XEP_0373: |
1015 """ | 1014 """ |
1036 xep_0163.add_pep_event( | 1035 xep_0163.add_pep_event( |
1037 "OX_PUBLIC_KEYS_LIST", | 1036 "OX_PUBLIC_KEYS_LIST", |
1038 PUBLIC_KEYS_LIST_NODE, | 1037 PUBLIC_KEYS_LIST_NODE, |
1039 lambda items_event, profile: defer.ensureDeferred( | 1038 lambda items_event, profile: defer.ensureDeferred( |
1040 self.__on_public_keys_list_update(items_event, profile) | 1039 self.__on_public_keys_list_update(items_event, profile) |
1041 ) | 1040 ), |
1042 ) | 1041 ) |
1043 | 1042 |
1044 async def profile_connecting(self, client): | 1043 async def profile_connecting(self, client): |
1045 client.gpg_provider = get_gpg_provider(self.host, client) | 1044 client.gpg_provider = get_gpg_provider(self.host, client) |
1046 | 1045 |
1047 async def profile_connected( # pylint: disable=invalid-name | 1046 async def profile_connected( # pylint: disable=invalid-name |
1048 self, | 1047 self, client: SatXMPPClient |
1049 client: SatXMPPClient | |
1050 ) -> None: | 1048 ) -> None: |
1051 """ | 1049 """ |
1052 @param client: The client. | 1050 @param client: The client. |
1053 """ | 1051 """ |
1054 | 1052 |
1055 profile = cast(str, client.profile) | 1053 profile = cast(str, client.profile) |
1056 | 1054 |
1057 if not profile in self.__storage: | 1055 if not profile in self.__storage: |
1058 self.__storage[profile] = \ | 1056 self.__storage[profile] = persistent.LazyPersistentBinaryDict( |
1059 persistent.LazyPersistentBinaryDict("XEP-0373", client.profile) | 1057 "XEP-0373", client.profile |
1058 ) | |
1060 | 1059 |
1061 if len(self.list_secret_keys(client)) == 0: | 1060 if len(self.list_secret_keys(client)) == 0: |
1062 log.debug(f"Generating first GPG key for {client.jid.userhost()}.") | 1061 log.debug(f"Generating first GPG key for {client.jid.userhost()}.") |
1063 await self.create_key(client) | 1062 await self.create_key(client) |
1064 | 1063 |
1065 async def __on_public_keys_list_update( | 1064 async def __on_public_keys_list_update( |
1066 self, | 1065 self, items_event: pubsub.ItemsEvent, profile: str |
1067 items_event: pubsub.ItemsEvent, | |
1068 profile: str | |
1069 ) -> None: | 1066 ) -> None: |
1070 """Handle public keys list updates fired by PEP. | 1067 """Handle public keys list updates fired by PEP. |
1071 | 1068 |
1072 @param items_event: The event. | 1069 @param items_event: The event. |
1073 @param profile: The profile this event belongs to. | 1070 @param profile: The profile this event belongs to. |
1087 log.debug("Ignoring empty public keys list update.") | 1084 log.debug("Ignoring empty public keys list update.") |
1088 return | 1085 return |
1089 | 1086 |
1090 public_keys_list_elt = cast( | 1087 public_keys_list_elt = cast( |
1091 Optional[domish.Element], | 1088 Optional[domish.Element], |
1092 next(item_elt.elements(NS_OX, "public-keys-list"), None) | 1089 next(item_elt.elements(NS_OX, "public-keys-list"), None), |
1093 ) | 1090 ) |
1094 | 1091 |
1095 pubkey_metadata_elts: Optional[List[domish.Element]] = None | 1092 pubkey_metadata_elts: Optional[List[domish.Element]] = None |
1096 | 1093 |
1097 if public_keys_list_elt is not None: | 1094 if public_keys_list_elt is not None: |
1098 try: | 1095 try: |
1099 PUBLIC_KEYS_LIST_SCHEMA.validate(public_keys_list_elt.toXml()) | 1096 PUBLIC_KEYS_LIST_SCHEMA.validate(public_keys_list_elt.toXml()) |
1100 except xmlschema.XMLSchemaValidationError: | 1097 except xmlschema.XMLSchemaValidationError: |
1101 pass | 1098 pass |
1102 else: | 1099 else: |
1103 pubkey_metadata_elts = \ | 1100 pubkey_metadata_elts = list( |
1104 list(public_keys_list_elt.elements(NS_OX, "pubkey-metadata")) | 1101 public_keys_list_elt.elements(NS_OX, "pubkey-metadata") |
1102 ) | |
1105 | 1103 |
1106 if pubkey_metadata_elts is None: | 1104 if pubkey_metadata_elts is None: |
1107 log.warning(f"Malformed public keys list update item: {item_elt.toXml()}") | 1105 log.warning(f"Malformed public keys list update item: {item_elt.toXml()}") |
1108 return | 1106 return |
1109 | 1107 |
1110 new_public_keys_metadata = { PublicKeyMetadata( | 1108 new_public_keys_metadata = { |
1111 fingerprint=cast(str, pubkey_metadata_elt["v4-fingerprint"]), | 1109 PublicKeyMetadata( |
1112 timestamp=parse_datetime(cast(str, pubkey_metadata_elt["date"])) | 1110 fingerprint=cast(str, pubkey_metadata_elt["v4-fingerprint"]), |
1113 ) for pubkey_metadata_elt in pubkey_metadata_elts } | 1111 timestamp=parse_datetime(cast(str, pubkey_metadata_elt["date"])), |
1112 ) | |
1113 for pubkey_metadata_elt in pubkey_metadata_elts | |
1114 } | |
1114 | 1115 |
1115 storage_key = STR_KEY_PUBLIC_KEYS_METADATA.format(sender.userhost()) | 1116 storage_key = STR_KEY_PUBLIC_KEYS_METADATA.format(sender.userhost()) |
1116 | 1117 |
1117 local_public_keys_metadata = { | 1118 local_public_keys_metadata = { |
1118 PublicKeyMetadata.from_dict(pkm) | 1119 PublicKeyMetadata.from_dict(pkm) |
1138 | 1139 |
1139 # Check whether this update was for our account and make sure all of our keys are | 1140 # Check whether this update was for our account and make sure all of our keys are |
1140 # included in the update | 1141 # included in the update |
1141 if sender.userhost() == client.jid.userhost(): | 1142 if sender.userhost() == client.jid.userhost(): |
1142 secret_keys = self.list_secret_keys(client) | 1143 secret_keys = self.list_secret_keys(client) |
1143 missing_keys = set(filter(lambda secret_key: all( | 1144 missing_keys = set( |
1144 key_metadata.fingerprint != secret_key.public_key.fingerprint | 1145 filter( |
1145 for key_metadata | 1146 lambda secret_key: all( |
1146 in new_public_keys_metadata | 1147 key_metadata.fingerprint != secret_key.public_key.fingerprint |
1147 ), secret_keys)) | 1148 for key_metadata in new_public_keys_metadata |
1149 ), | |
1150 secret_keys, | |
1151 ) | |
1152 ) | |
1148 | 1153 |
1149 if len(missing_keys) > 0: | 1154 if len(missing_keys) > 0: |
1150 log.warning( | 1155 log.warning( |
1151 "Public keys list update did not contain at least one of our keys." | 1156 "Public keys list update did not contain at least one of our keys." |
1152 f" {new_public_keys_metadata}" | 1157 f" {new_public_keys_metadata}" |
1153 ) | 1158 ) |
1154 | 1159 |
1155 for missing_key in missing_keys: | 1160 for missing_key in missing_keys: |
1156 log.warning(missing_key.public_key.fingerprint) | 1161 log.warning(missing_key.public_key.fingerprint) |
1157 new_public_keys_metadata.add(PublicKeyMetadata( | 1162 new_public_keys_metadata.add( |
1158 fingerprint=missing_key.public_key.fingerprint, | 1163 PublicKeyMetadata( |
1159 timestamp=datetime.now(timezone.utc) | 1164 fingerprint=missing_key.public_key.fingerprint, |
1160 )) | 1165 timestamp=datetime.now(timezone.utc), |
1166 ) | |
1167 ) | |
1161 | 1168 |
1162 await self.publish_public_keys_list(client, new_public_keys_metadata) | 1169 await self.publish_public_keys_list(client, new_public_keys_metadata) |
1163 | 1170 |
1164 await self.__storage[profile].force( | 1171 await self.__storage[profile].force( |
1165 storage_key, | 1172 storage_key, [pkm.to_dict() for pkm in new_public_keys_metadata] |
1166 [pkm.to_dict() for pkm in new_public_keys_metadata] | |
1167 ) | 1173 ) |
1168 | 1174 |
1169 def list_public_keys(self, client: SatXMPPClient, jid: jid.JID) -> Set[GPGPublicKey]: | 1175 def list_public_keys(self, client: SatXMPPClient, jid: jid.JID) -> Set[GPGPublicKey]: |
1170 """List GPG public keys available for a JID. | 1176 """List GPG public keys available for a JID. |
1171 | 1177 |
1209 public_keys_list = { | 1215 public_keys_list = { |
1210 PublicKeyMetadata.from_dict(pkm) | 1216 PublicKeyMetadata.from_dict(pkm) |
1211 for pkm in await self.__storage[client.profile].get(storage_key, []) | 1217 for pkm in await self.__storage[client.profile].get(storage_key, []) |
1212 } | 1218 } |
1213 | 1219 |
1214 public_keys_list.add(PublicKeyMetadata( | 1220 public_keys_list.add( |
1215 fingerprint=secret_key.public_key.fingerprint, | 1221 PublicKeyMetadata( |
1216 timestamp=datetime.now(timezone.utc) | 1222 fingerprint=secret_key.public_key.fingerprint, |
1217 )) | 1223 timestamp=datetime.now(timezone.utc), |
1224 ) | |
1225 ) | |
1218 | 1226 |
1219 await self.publish_public_keys_list(client, public_keys_list) | 1227 await self.publish_public_keys_list(client, public_keys_list) |
1220 | 1228 |
1221 await self.__storage[client.profile].force( | 1229 await self.__storage[client.profile].force( |
1222 storage_key, | 1230 storage_key, [pkm.to_dict() for pkm in public_keys_list] |
1223 [pkm.to_dict() for pkm in public_keys_list] | |
1224 ) | 1231 ) |
1225 | 1232 |
1226 return secret_key | 1233 return secret_key |
1227 | 1234 |
1228 @staticmethod | 1235 @staticmethod |
1229 def __build_content_element( | 1236 def __build_content_element( |
1230 element_name: Literal["signcrypt", "sign", "crypt"], | 1237 element_name: Literal["signcrypt", "sign", "crypt"], |
1231 recipient_jids: Iterable[jid.JID], | 1238 recipient_jids: Iterable[jid.JID], |
1232 include_rpad: bool | 1239 include_rpad: bool, |
1233 ) -> Tuple[domish.Element, domish.Element]: | 1240 ) -> Tuple[domish.Element, domish.Element]: |
1234 """Build a content element. | 1241 """Build a content element. |
1235 | 1242 |
1236 @param element_name: The name of the content element. | 1243 @param element_name: The name of the content element. |
1237 @param recipient_jids: The intended recipients of this content element. Can be | 1244 @param recipient_jids: The intended recipients of this content element. Can be |
1252 # XEP-0373 doesn't specify bounds for the length of the random padding. This | 1259 # XEP-0373 doesn't specify bounds for the length of the random padding. This |
1253 # uses the bounds specified in XEP-0420 for the closely related rpad affix. | 1260 # uses the bounds specified in XEP-0420 for the closely related rpad affix. |
1254 rpad_length = secrets.randbelow(201) | 1261 rpad_length = secrets.randbelow(201) |
1255 rpad_content = "".join( | 1262 rpad_content = "".join( |
1256 secrets.choice(string.digits + string.ascii_letters + string.punctuation) | 1263 secrets.choice(string.digits + string.ascii_letters + string.punctuation) |
1257 for __ | 1264 for __ in range(rpad_length) |
1258 in range(rpad_length) | |
1259 ) | 1265 ) |
1260 content_elt.addElement("rpad", content=rpad_content) | 1266 content_elt.addElement("rpad", content=rpad_content) |
1261 | 1267 |
1262 payload_elt = content_elt.addElement("payload") | 1268 payload_elt = content_elt.addElement("payload") |
1263 | 1269 |
1264 return content_elt, payload_elt | 1270 return content_elt, payload_elt |
1265 | 1271 |
1266 @staticmethod | 1272 @staticmethod |
1267 def build_signcrypt_element( | 1273 def build_signcrypt_element( |
1268 recipient_jids: Iterable[jid.JID] | 1274 recipient_jids: Iterable[jid.JID], |
1269 ) -> Tuple[domish.Element, domish.Element]: | 1275 ) -> Tuple[domish.Element, domish.Element]: |
1270 """Build a ``<signcrypt/>`` content element. | 1276 """Build a ``<signcrypt/>`` content element. |
1271 | 1277 |
1272 @param recipient_jids: The intended recipients of this content element. Can be | 1278 @param recipient_jids: The intended recipients of this content element. Can be |
1273 bare JIDs. | 1279 bare JIDs. |
1280 | 1286 |
1281 return XEP_0373.__build_content_element("signcrypt", recipient_jids, True) | 1287 return XEP_0373.__build_content_element("signcrypt", recipient_jids, True) |
1282 | 1288 |
1283 @staticmethod | 1289 @staticmethod |
1284 def build_sign_element( | 1290 def build_sign_element( |
1285 recipient_jids: Iterable[jid.JID], | 1291 recipient_jids: Iterable[jid.JID], include_rpad: bool |
1286 include_rpad: bool | |
1287 ) -> Tuple[domish.Element, domish.Element]: | 1292 ) -> Tuple[domish.Element, domish.Element]: |
1288 """Build a ``<sign/>`` content element. | 1293 """Build a ``<sign/>`` content element. |
1289 | 1294 |
1290 @param recipient_jids: The intended recipients of this content element. Can be | 1295 @param recipient_jids: The intended recipients of this content element. Can be |
1291 bare JIDs. | 1296 bare JIDs. |
1300 | 1305 |
1301 return XEP_0373.__build_content_element("sign", recipient_jids, include_rpad) | 1306 return XEP_0373.__build_content_element("sign", recipient_jids, include_rpad) |
1302 | 1307 |
1303 @staticmethod | 1308 @staticmethod |
1304 def build_crypt_element( | 1309 def build_crypt_element( |
1305 recipient_jids: Iterable[jid.JID] | 1310 recipient_jids: Iterable[jid.JID], |
1306 ) -> Tuple[domish.Element, domish.Element]: | 1311 ) -> Tuple[domish.Element, domish.Element]: |
1307 """Build a ``<crypt/>`` content element. | 1312 """Build a ``<crypt/>`` content element. |
1308 | 1313 |
1309 @param recipient_jids: The intended recipients of this content element. Specifying | 1314 @param recipient_jids: The intended recipients of this content element. Specifying |
1310 the intended recipients is OPTIONAL for the ``<crypt/>`` content element. Can | 1315 the intended recipients is OPTIONAL for the ``<crypt/>`` content element. Can |
1317 | 1322 |
1318 async def build_openpgp_element( | 1323 async def build_openpgp_element( |
1319 self, | 1324 self, |
1320 client: SatXMPPClient, | 1325 client: SatXMPPClient, |
1321 content_elt: domish.Element, | 1326 content_elt: domish.Element, |
1322 recipient_jids: Set[jid.JID] | 1327 recipient_jids: Set[jid.JID], |
1323 ) -> domish.Element: | 1328 ) -> domish.Element: |
1324 """Build an ``<openpgp/>`` element. | 1329 """Build an ``<openpgp/>`` element. |
1325 | 1330 |
1326 @param client: The client to perform this operation with. | 1331 @param client: The client to perform this operation with. |
1327 @param content_elt: The content element to contain in the ``<openpgp/>`` element. | 1332 @param content_elt: The content element to contain in the ``<openpgp/>`` element. |
1331 | 1336 |
1332 gpg_provider = get_gpg_provider(self.host, client) | 1337 gpg_provider = get_gpg_provider(self.host, client) |
1333 | 1338 |
1334 # TODO: I'm not sure whether we want to sign with all keys by default or choose | 1339 # TODO: I'm not sure whether we want to sign with all keys by default or choose |
1335 # just one key/a subset of keys to sign with. | 1340 # just one key/a subset of keys to sign with. |
1336 signing_keys = set(filter( | 1341 signing_keys = set( |
1337 lambda secret_key: gpg_provider.can_sign(secret_key.public_key), | 1342 filter( |
1338 self.list_secret_keys(client) | 1343 lambda secret_key: gpg_provider.can_sign(secret_key.public_key), |
1339 )) | 1344 self.list_secret_keys(client), |
1345 ) | |
1346 ) | |
1340 | 1347 |
1341 encryption_keys: Set[GPGPublicKey] = set() | 1348 encryption_keys: Set[GPGPublicKey] = set() |
1342 | 1349 |
1343 for recipient_jid in recipient_jids: | 1350 for recipient_jid in recipient_jids: |
1344 # import all keys of the recipient | 1351 # import all keys of the recipient |
1368 async def unpack_openpgp_element( | 1375 async def unpack_openpgp_element( |
1369 self, | 1376 self, |
1370 client: SatXMPPClient, | 1377 client: SatXMPPClient, |
1371 openpgp_elt: domish.Element, | 1378 openpgp_elt: domish.Element, |
1372 element_name: Literal["signcrypt", "sign", "crypt"], | 1379 element_name: Literal["signcrypt", "sign", "crypt"], |
1373 sender_jid: jid.JID | 1380 sender_jid: jid.JID, |
1374 ) -> Tuple[domish.Element, datetime]: | 1381 ) -> Tuple[domish.Element, datetime]: |
1375 """Verify, decrypt and unpack an ``<openpgp/>`` element. | 1382 """Verify, decrypt and unpack an ``<openpgp/>`` element. |
1376 | 1383 |
1377 @param client: The client to perform this operation with. | 1384 @param client: The client to perform this operation with. |
1378 @param openpgp_elt: The ``<openpgp/>`` element. | 1385 @param openpgp_elt: The ``<openpgp/>`` element. |
1390 the calling code. | 1397 the calling code. |
1391 """ | 1398 """ |
1392 | 1399 |
1393 gpg_provider = get_gpg_provider(self.host, client) | 1400 gpg_provider = get_gpg_provider(self.host, client) |
1394 | 1401 |
1395 decryption_keys = set(filter( | 1402 decryption_keys = set( |
1396 lambda secret_key: gpg_provider.can_encrypt(secret_key.public_key), | 1403 filter( |
1397 self.list_secret_keys(client) | 1404 lambda secret_key: gpg_provider.can_encrypt(secret_key.public_key), |
1398 )) | 1405 self.list_secret_keys(client), |
1406 ) | |
1407 ) | |
1399 | 1408 |
1400 # import all keys of the sender | 1409 # import all keys of the sender |
1401 all_public_keys = await self.import_all_public_keys(client, sender_jid) | 1410 all_public_keys = await self.import_all_public_keys(client, sender_jid) |
1402 | 1411 |
1403 # Filter for keys that can sign | 1412 # Filter for keys that can sign |
1415 openpgp_message = base64.b64decode(str(openpgp_elt)) | 1424 openpgp_message = base64.b64decode(str(openpgp_elt)) |
1416 content: bytes | 1425 content: bytes |
1417 | 1426 |
1418 if element_name == "signcrypt": | 1427 if element_name == "signcrypt": |
1419 content = gpg_provider.decrypt( | 1428 content = gpg_provider.decrypt( |
1420 openpgp_message, | 1429 openpgp_message, decryption_keys, public_keys=verification_keys |
1421 decryption_keys, | |
1422 public_keys=verification_keys | |
1423 ) | 1430 ) |
1424 elif element_name == "sign": | 1431 elif element_name == "sign": |
1425 content = gpg_provider.verify(openpgp_message, verification_keys) | 1432 content = gpg_provider.verify(openpgp_message, verification_keys) |
1426 elif element_name == "crypt": | 1433 elif element_name == "crypt": |
1427 content = gpg_provider.decrypt(openpgp_message, decryption_keys) | 1434 content = gpg_provider.decrypt(openpgp_message, decryption_keys) |
1428 else: | 1435 else: |
1429 assert_never(element_name) | 1436 assert_never(element_name) |
1430 | 1437 |
1431 try: | 1438 try: |
1432 content_elt = cast( | 1439 content_elt = cast( |
1433 domish.Element, | 1440 domish.Element, xml_tools.ElementParser()(content.decode("utf-8")) |
1434 xml_tools.ElementParser()(content.decode("utf-8")) | |
1435 ) | 1441 ) |
1436 except UnicodeDecodeError as e: | 1442 except UnicodeDecodeError as e: |
1437 raise exceptions.ParsingError("UTF-8 decoding error") from e | 1443 raise exceptions.ParsingError("UTF-8 decoding error") from e |
1438 | 1444 |
1439 try: | 1445 try: |
1444 ) from e | 1450 ) from e |
1445 | 1451 |
1446 if content_elt.name != element_name: | 1452 if content_elt.name != element_name: |
1447 raise exceptions.ParsingError(f"Not a <{element_name}/> element.") | 1453 raise exceptions.ParsingError(f"Not a <{element_name}/> element.") |
1448 | 1454 |
1449 recipient_jids = \ | 1455 recipient_jids = { |
1450 { jid.JID(to_elt["jid"]) for to_elt in content_elt.elements(NS_OX, "to") } | 1456 jid.JID(to_elt["jid"]) for to_elt in content_elt.elements(NS_OX, "to") |
1457 } | |
1451 | 1458 |
1452 if ( | 1459 if ( |
1453 client.jid.userhostJID() not in { jid.userhostJID() for jid in recipient_jids } | 1460 client.jid.userhostJID() not in {jid.userhostJID() for jid in recipient_jids} |
1454 and element_name != "crypt" | 1461 and element_name != "crypt" |
1455 ): | 1462 ): |
1456 raise VerificationError( | 1463 raise VerificationError( |
1457 f"Recipient list in <{element_name}/> element does not list our (bare)" | 1464 f"Recipient list in <{element_name}/> element does not list our (bare)" |
1458 f" JID." | 1465 f" JID." |
1465 payload_elt = next(content_elt.elements(NS_OX, "payload")) | 1472 payload_elt = next(content_elt.elements(NS_OX, "payload")) |
1466 | 1473 |
1467 return payload_elt, timestamp | 1474 return payload_elt, timestamp |
1468 | 1475 |
1469 async def publish_public_key( | 1476 async def publish_public_key( |
1470 self, | 1477 self, client: SatXMPPClient, public_key: GPGPublicKey |
1471 client: SatXMPPClient, | |
1472 public_key: GPGPublicKey | |
1473 ) -> None: | 1478 ) -> None: |
1474 """Publish a public key. | 1479 """Publish a public key. |
1475 | 1480 |
1476 @param client: The client. | 1481 @param client: The client. |
1477 @param public_key: The public key to publish. | 1482 @param public_key: The public key to publish. |
1497 format_datetime(), | 1502 format_datetime(), |
1498 extra={ | 1503 extra={ |
1499 XEP_0060.EXTRA_PUBLISH_OPTIONS: { | 1504 XEP_0060.EXTRA_PUBLISH_OPTIONS: { |
1500 XEP_0060.OPT_PERSIST_ITEMS: "true", | 1505 XEP_0060.OPT_PERSIST_ITEMS: "true", |
1501 XEP_0060.OPT_ACCESS_MODEL: "open", | 1506 XEP_0060.OPT_ACCESS_MODEL: "open", |
1502 XEP_0060.OPT_MAX_ITEMS: 1 | 1507 XEP_0060.OPT_MAX_ITEMS: 1, |
1503 }, | 1508 }, |
1504 # TODO: Do we really want publish_without_options here? | 1509 # TODO: Do we really want publish_without_options here? |
1505 XEP_0060.EXTRA_ON_PRECOND_NOT_MET: "publish_without_options" | 1510 XEP_0060.EXTRA_ON_PRECOND_NOT_MET: "publish_without_options", |
1506 } | 1511 }, |
1507 ) | 1512 ) |
1508 except Exception as e: | 1513 except Exception as e: |
1509 raise XMPPInteractionFailed("Publishing the public key failed.") from e | 1514 raise XMPPInteractionFailed("Publishing the public key failed.") from e |
1510 | 1515 |
1511 async def import_all_public_keys( | 1516 async def import_all_public_keys( |
1512 self, | 1517 self, client: SatXMPPClient, entity_jid: jid.JID |
1513 client: SatXMPPClient, | |
1514 entity_jid: jid.JID | |
1515 ) -> Set[GPGPublicKey]: | 1518 ) -> Set[GPGPublicKey]: |
1516 """import all public keys of a JID that have not been imported before. | 1519 """import all public keys of a JID that have not been imported before. |
1517 | 1520 |
1518 @param client: The client. | 1521 @param client: The client. |
1519 @param jid: The JID. Can be a bare JID. | 1522 @param jid: The JID. Can be a bare JID. |
1533 if not public_keys_metadata: | 1536 if not public_keys_metadata: |
1534 public_keys_metadata = await self.download_public_keys_list( | 1537 public_keys_metadata = await self.download_public_keys_list( |
1535 client, entity_jid | 1538 client, entity_jid |
1536 ) | 1539 ) |
1537 if not public_keys_metadata: | 1540 if not public_keys_metadata: |
1538 raise exceptions.NotFound( | 1541 raise exceptions.NotFound(f"Can't find public keys for {entity_jid}") |
1539 f"Can't find public keys for {entity_jid}" | |
1540 ) | |
1541 else: | 1542 else: |
1542 await self.__storage[client.profile].aset( | 1543 await self.__storage[client.profile].aset( |
1543 storage_key, | 1544 storage_key, [pkm.to_dict() for pkm in public_keys_metadata] |
1544 [pkm.to_dict() for pkm in public_keys_metadata] | |
1545 ) | 1545 ) |
1546 | 1546 |
1547 | 1547 missing_keys = set( |
1548 missing_keys = set(filter(lambda public_key_metadata: all( | 1548 filter( |
1549 public_key_metadata.fingerprint != public_key.fingerprint | 1549 lambda public_key_metadata: all( |
1550 for public_key | 1550 public_key_metadata.fingerprint != public_key.fingerprint |
1551 in available_public_keys | 1551 for public_key in available_public_keys |
1552 ), public_keys_metadata)) | 1552 ), |
1553 public_keys_metadata, | |
1554 ) | |
1555 ) | |
1553 | 1556 |
1554 for missing_key in missing_keys: | 1557 for missing_key in missing_keys: |
1555 try: | 1558 try: |
1556 available_public_keys.add( | 1559 available_public_keys.add( |
1557 await self.import_public_key(client, entity_jid, missing_key.fingerprint) | 1560 await self.import_public_key( |
1561 client, entity_jid, missing_key.fingerprint | |
1562 ) | |
1558 ) | 1563 ) |
1559 except Exception as e: | 1564 except Exception as e: |
1560 log.warning( | 1565 log.warning( |
1561 f"import of public key {missing_key.fingerprint} owned by" | 1566 f"import of public key {missing_key.fingerprint} owned by" |
1562 f" {entity_jid.userhost()} failed, ignoring: {e}" | 1567 f" {entity_jid.userhost()} failed, ignoring: {e}" |
1563 ) | 1568 ) |
1564 | 1569 |
1565 return available_public_keys | 1570 return available_public_keys |
1566 | 1571 |
1567 async def import_public_key( | 1572 async def import_public_key( |
1568 self, | 1573 self, client: SatXMPPClient, jid: jid.JID, fingerprint: str |
1569 client: SatXMPPClient, | |
1570 jid: jid.JID, | |
1571 fingerprint: str | |
1572 ) -> GPGPublicKey: | 1574 ) -> GPGPublicKey: |
1573 """import a public key. | 1575 """import a public key. |
1574 | 1576 |
1575 @param client: The client. | 1577 @param client: The client. |
1576 @param jid: The JID owning the public key. Can be a bare JID. | 1578 @param jid: The JID owning the public key. Can be a bare JID. |
1587 | 1589 |
1588 node = f"urn:xmpp:openpgp:0:public-keys:{fingerprint}" | 1590 node = f"urn:xmpp:openpgp:0:public-keys:{fingerprint}" |
1589 | 1591 |
1590 try: | 1592 try: |
1591 items, __ = await self.__xep_0060.get_items( | 1593 items, __ = await self.__xep_0060.get_items( |
1592 client, | 1594 client, jid.userhostJID(), node, max_items=1 |
1593 jid.userhostJID(), | |
1594 node, | |
1595 max_items=1 | |
1596 ) | 1595 ) |
1597 except exceptions.NotFound as e: | 1596 except exceptions.NotFound as e: |
1598 raise exceptions.NotFound( | 1597 raise exceptions.NotFound( |
1599 f"No public key with fingerprint {fingerprint} published by JID" | 1598 f"No public key with fingerprint {fingerprint} published by JID" |
1600 f" {jid.userhost()}." | 1599 f" {jid.userhost()}." |
1609 f"No public key with fingerprint {fingerprint} published by JID" | 1608 f"No public key with fingerprint {fingerprint} published by JID" |
1610 f" {jid.userhost()}." | 1609 f" {jid.userhost()}." |
1611 ) from e | 1610 ) from e |
1612 | 1611 |
1613 pubkey_elt = cast( | 1612 pubkey_elt = cast( |
1614 Optional[domish.Element], | 1613 Optional[domish.Element], next(item_elt.elements(NS_OX, "pubkey"), None) |
1615 next(item_elt.elements(NS_OX, "pubkey"), None) | |
1616 ) | 1614 ) |
1617 | 1615 |
1618 if pubkey_elt is None: | 1616 if pubkey_elt is None: |
1619 raise exceptions.ParsingError( | 1617 raise exceptions.ParsingError( |
1620 f"Publish-Subscribe item of JID {jid.userhost()} doesn't contain pubkey" | 1618 f"Publish-Subscribe item of JID {jid.userhost()} doesn't contain pubkey" |
1627 raise exceptions.ParsingError( | 1625 raise exceptions.ParsingError( |
1628 f"Publish-Subscribe item of JID {jid.userhost()} doesn't pass pubkey" | 1626 f"Publish-Subscribe item of JID {jid.userhost()} doesn't pass pubkey" |
1629 f" schema validation." | 1627 f" schema validation." |
1630 ) from e | 1628 ) from e |
1631 | 1629 |
1632 public_key = gpg_provider.import_public_key(base64.b64decode(str( | 1630 public_key = gpg_provider.import_public_key( |
1633 next(pubkey_elt.elements(NS_OX, "data")) | 1631 base64.b64decode(str(next(pubkey_elt.elements(NS_OX, "data")))) |
1634 ))) | 1632 ) |
1635 | 1633 |
1636 return public_key | 1634 return public_key |
1637 | 1635 |
1638 async def publish_public_keys_list( | 1636 async def publish_public_keys_list( |
1639 self, | 1637 self, client: SatXMPPClient, public_keys_list: Iterable[PublicKeyMetadata] |
1640 client: SatXMPPClient, | |
1641 public_keys_list: Iterable[PublicKeyMetadata] | |
1642 ) -> None: | 1638 ) -> None: |
1643 """Publish/update the own public keys list. | 1639 """Publish/update the own public keys list. |
1644 | 1640 |
1645 @param client: The client. | 1641 @param client: The client. |
1646 @param public_keys_list: The public keys list. | 1642 @param public_keys_list: The public keys list. |
1648 | 1644 |
1649 @warning: All public keys referenced in the public keys list MUST be published | 1645 @warning: All public keys referenced in the public keys list MUST be published |
1650 beforehand. | 1646 beforehand. |
1651 """ | 1647 """ |
1652 | 1648 |
1653 if len({ pkm.fingerprint for pkm in public_keys_list }) != len(public_keys_list): | 1649 if len({pkm.fingerprint for pkm in public_keys_list}) != len(public_keys_list): |
1654 raise ValueError("Public keys list contains duplicate fingerprints.") | 1650 raise ValueError("Public keys list contains duplicate fingerprints.") |
1655 | 1651 |
1656 node = "urn:xmpp:openpgp:0:public-keys" | 1652 node = "urn:xmpp:openpgp:0:public-keys" |
1657 | 1653 |
1658 public_keys_list_elt = domish.Element((NS_OX, "public-keys-list")) | 1654 public_keys_list_elt = domish.Element((NS_OX, "public-keys-list")) |
1671 item_id=XEP_0060.ID_SINGLETON, | 1667 item_id=XEP_0060.ID_SINGLETON, |
1672 extra={ | 1668 extra={ |
1673 XEP_0060.EXTRA_PUBLISH_OPTIONS: { | 1669 XEP_0060.EXTRA_PUBLISH_OPTIONS: { |
1674 XEP_0060.OPT_PERSIST_ITEMS: "true", | 1670 XEP_0060.OPT_PERSIST_ITEMS: "true", |
1675 XEP_0060.OPT_ACCESS_MODEL: "open", | 1671 XEP_0060.OPT_ACCESS_MODEL: "open", |
1676 XEP_0060.OPT_MAX_ITEMS: 1 | 1672 XEP_0060.OPT_MAX_ITEMS: 1, |
1677 }, | 1673 }, |
1678 # TODO: Do we really want publish_without_options here? | 1674 # TODO: Do we really want publish_without_options here? |
1679 XEP_0060.EXTRA_ON_PRECOND_NOT_MET: "publish_without_options" | 1675 XEP_0060.EXTRA_ON_PRECOND_NOT_MET: "publish_without_options", |
1680 } | 1676 }, |
1681 ) | 1677 ) |
1682 except Exception as e: | 1678 except Exception as e: |
1683 raise XMPPInteractionFailed("Publishing the public keys list failed.") from e | 1679 raise XMPPInteractionFailed("Publishing the public keys list failed.") from e |
1684 | 1680 |
1685 async def download_public_keys_list( | 1681 async def download_public_keys_list( |
1686 self, | 1682 self, client: SatXMPPClient, jid: jid.JID |
1687 client: SatXMPPClient, | |
1688 jid: jid.JID | |
1689 ) -> Optional[Set[PublicKeyMetadata]]: | 1683 ) -> Optional[Set[PublicKeyMetadata]]: |
1690 """Download the public keys list of a JID. | 1684 """Download the public keys list of a JID. |
1691 | 1685 |
1692 @param client: The client. | 1686 @param client: The client. |
1693 @param jid: The JID. Can be a bare JID. | 1687 @param jid: The JID. Can be a bare JID. |
1699 | 1693 |
1700 node = "urn:xmpp:openpgp:0:public-keys" | 1694 node = "urn:xmpp:openpgp:0:public-keys" |
1701 | 1695 |
1702 try: | 1696 try: |
1703 items, __ = await self.__xep_0060.get_items( | 1697 items, __ = await self.__xep_0060.get_items( |
1704 client, | 1698 client, jid.userhostJID(), node, max_items=1 |
1705 jid.userhostJID(), | |
1706 node, | |
1707 max_items=1 | |
1708 ) | 1699 ) |
1709 except exceptions.NotFound: | 1700 except exceptions.NotFound: |
1710 return None | 1701 return None |
1711 except Exception as e: | 1702 except Exception as e: |
1712 raise XMPPInteractionFailed() from e | 1703 raise XMPPInteractionFailed() from e |
1716 except IndexError: | 1707 except IndexError: |
1717 return None | 1708 return None |
1718 | 1709 |
1719 public_keys_list_elt = cast( | 1710 public_keys_list_elt = cast( |
1720 Optional[domish.Element], | 1711 Optional[domish.Element], |
1721 next(item_elt.elements(NS_OX, "public-keys-list"), None) | 1712 next(item_elt.elements(NS_OX, "public-keys-list"), None), |
1722 ) | 1713 ) |
1723 | 1714 |
1724 if public_keys_list_elt is None: | 1715 if public_keys_list_elt is None: |
1725 return None | 1716 return None |
1726 | 1717 |
1733 ) from e | 1724 ) from e |
1734 | 1725 |
1735 return { | 1726 return { |
1736 PublicKeyMetadata( | 1727 PublicKeyMetadata( |
1737 fingerprint=pubkey_metadata_elt["v4-fingerprint"], | 1728 fingerprint=pubkey_metadata_elt["v4-fingerprint"], |
1738 timestamp=parse_datetime(pubkey_metadata_elt["date"]) | 1729 timestamp=parse_datetime(pubkey_metadata_elt["date"]), |
1739 ) | 1730 ) |
1740 for pubkey_metadata_elt | 1731 for pubkey_metadata_elt in public_keys_list_elt.elements( |
1741 in public_keys_list_elt.elements(NS_OX, "pubkey-metadata") | 1732 NS_OX, "pubkey-metadata" |
1733 ) | |
1742 } | 1734 } |
1743 | 1735 |
1744 async def __prepare_secret_key_synchronization( | 1736 async def __prepare_secret_key_synchronization( |
1745 self, | 1737 self, client: SatXMPPClient |
1746 client: SatXMPPClient | |
1747 ) -> Optional[domish.Element]: | 1738 ) -> Optional[domish.Element]: |
1748 """Prepare for secret key synchronization. | 1739 """Prepare for secret key synchronization. |
1749 | 1740 |
1750 Makes sure the relative protocols and protocol extensions are supported by the | 1741 Makes sure the relative protocols and protocol extensions are supported by the |
1751 server and makes sure that the PEP node for secret synchronization exists and is | 1742 server and makes sure that the PEP node for secret synchronization exists and is |
1758 protocols or protocol extensions. | 1749 protocols or protocol extensions. |
1759 @raise XMPPInteractionFailed: if any interaction via XMPP failed. | 1750 @raise XMPPInteractionFailed: if any interaction via XMPP failed. |
1760 """ | 1751 """ |
1761 | 1752 |
1762 try: | 1753 try: |
1763 infos = cast(DiscoInfo, await self.host.memory.disco.get_infos( | 1754 infos = cast( |
1764 client, | 1755 DiscoInfo, |
1765 client.jid.userhostJID() | 1756 await self.host.memory.disco.get_infos(client, client.jid.userhostJID()), |
1766 )) | 1757 ) |
1767 except Exception as e: | 1758 except Exception as e: |
1768 raise XMPPInteractionFailed( | 1759 raise XMPPInteractionFailed( |
1769 "Error performing service discovery on the own bare JID." | 1760 "Error performing service discovery on the own bare JID." |
1770 ) from e | 1761 ) from e |
1771 | 1762 |
1778 if "http://jabber.org/protocol/pubsub#access-whitelist" not in features: | 1769 if "http://jabber.org/protocol/pubsub#access-whitelist" not in features: |
1779 raise exceptions.FeatureNotFound( | 1770 raise exceptions.FeatureNotFound( |
1780 "Server doesn't support the whitelist access model." | 1771 "Server doesn't support the whitelist access model." |
1781 ) | 1772 ) |
1782 | 1773 |
1783 persistent_items_supported = \ | 1774 persistent_items_supported = ( |
1784 "http://jabber.org/protocol/pubsub#persistent-items" in features | 1775 "http://jabber.org/protocol/pubsub#persistent-items" in features |
1776 ) | |
1785 | 1777 |
1786 # TODO: persistent-items is a SHOULD, how do we handle the feature missing? | 1778 # TODO: persistent-items is a SHOULD, how do we handle the feature missing? |
1787 | 1779 |
1788 node = "urn:xmpp:openpgp:0:secret-key" | 1780 node = "urn:xmpp:openpgp:0:secret-key" |
1789 | 1781 |
1790 try: | 1782 try: |
1791 items, __ = await self.__xep_0060.get_items( | 1783 items, __ = await self.__xep_0060.get_items( |
1792 client, | 1784 client, client.jid.userhostJID(), node, max_items=1 |
1793 client.jid.userhostJID(), | |
1794 node, | |
1795 max_items=1 | |
1796 ) | 1785 ) |
1797 except exceptions.NotFound: | 1786 except exceptions.NotFound: |
1798 try: | 1787 try: |
1799 await self.__xep_0060.createNode( | 1788 await self.__xep_0060.createNode( |
1800 client, | 1789 client, |
1801 client.jid.userhostJID(), | 1790 client.jid.userhostJID(), |
1802 node, | 1791 node, |
1803 { | 1792 { |
1804 XEP_0060.OPT_PERSIST_ITEMS: "true", | 1793 XEP_0060.OPT_PERSIST_ITEMS: "true", |
1805 XEP_0060.OPT_ACCESS_MODEL: "whitelist", | 1794 XEP_0060.OPT_ACCESS_MODEL: "whitelist", |
1806 XEP_0060.OPT_MAX_ITEMS: "1" | 1795 XEP_0060.OPT_MAX_ITEMS: "1", |
1807 } | 1796 }, |
1808 ) | 1797 ) |
1809 except Exception as e: | 1798 except Exception as e: |
1810 raise XMPPInteractionFailed( | 1799 raise XMPPInteractionFailed( |
1811 "Error creating the secret key synchronization node." | 1800 "Error creating the secret key synchronization node." |
1812 ) from e | 1801 ) from e |
1819 return cast(domish.Element, items[0]) | 1808 return cast(domish.Element, items[0]) |
1820 except IndexError: | 1809 except IndexError: |
1821 return None | 1810 return None |
1822 | 1811 |
1823 async def export_secret_keys( | 1812 async def export_secret_keys( |
1824 self, | 1813 self, client: SatXMPPClient, secret_keys: Iterable[GPGSecretKey] |
1825 client: SatXMPPClient, | |
1826 secret_keys: Iterable[GPGSecretKey] | |
1827 ) -> str: | 1814 ) -> str: |
1828 """Export secret keys to synchronize them with other devices. | 1815 """Export secret keys to synchronize them with other devices. |
1829 | 1816 |
1830 @param client: The client. | 1817 @param client: The client. |
1831 @param secret_keys: The secret keys to export. | 1818 @param secret_keys: The secret keys to export. |
1852 secretkey_elt = domish.Element((NS_OX, "secretkey")) | 1839 secretkey_elt = domish.Element((NS_OX, "secretkey")) |
1853 secretkey_elt.addContent(base64.b64encode(ciphertext).decode("ASCII")) | 1840 secretkey_elt.addContent(base64.b64encode(ciphertext).decode("ASCII")) |
1854 | 1841 |
1855 try: | 1842 try: |
1856 await self.__xep_0060.send_item( | 1843 await self.__xep_0060.send_item( |
1857 client, | 1844 client, client.jid.userhostJID(), node, secretkey_elt |
1858 client.jid.userhostJID(), | |
1859 node, | |
1860 secretkey_elt | |
1861 ) | 1845 ) |
1862 except Exception as e: | 1846 except Exception as e: |
1863 raise XMPPInteractionFailed("Publishing the secret keys failed.") from e | 1847 raise XMPPInteractionFailed("Publishing the secret keys failed.") from e |
1864 | 1848 |
1865 return backup_code | 1849 return backup_code |
1883 item_elt = await self.__prepare_secret_key_synchronization(client) | 1867 item_elt = await self.__prepare_secret_key_synchronization(client) |
1884 if item_elt is None: | 1868 if item_elt is None: |
1885 return None | 1869 return None |
1886 | 1870 |
1887 secretkey_elt = cast( | 1871 secretkey_elt = cast( |
1888 Optional[domish.Element], | 1872 Optional[domish.Element], next(item_elt.elements(NS_OX, "secretkey"), None) |
1889 next(item_elt.elements(NS_OX, "secretkey"), None) | |
1890 ) | 1873 ) |
1891 | 1874 |
1892 if secretkey_elt is None: | 1875 if secretkey_elt is None: |
1893 return None | 1876 return None |
1894 | 1877 |
1900 ) from e | 1883 ) from e |
1901 | 1884 |
1902 return base64.b64decode(str(secretkey_elt)) | 1885 return base64.b64decode(str(secretkey_elt)) |
1903 | 1886 |
1904 def import_secret_keys( | 1887 def import_secret_keys( |
1905 self, | 1888 self, client: SatXMPPClient, ciphertext: bytes, backup_code: str |
1906 client: SatXMPPClient, | |
1907 ciphertext: bytes, | |
1908 backup_code: str | |
1909 ) -> Set[GPGSecretKey]: | 1889 ) -> Set[GPGSecretKey]: |
1910 """import previously downloaded secret keys. | 1890 """import previously downloaded secret keys. |
1911 | 1891 |
1912 The downloading and importing steps are separate since a backup code is required | 1892 The downloading and importing steps are separate since a backup code is required |
1913 for the import and it should be possible to try multiple backup codes without | 1893 for the import and it should be possible to try multiple backup codes without |
1923 @raise DecryptionFailed: on decryption failure. | 1903 @raise DecryptionFailed: on decryption failure. |
1924 """ | 1904 """ |
1925 | 1905 |
1926 gpg_provider = get_gpg_provider(self.host, client) | 1906 gpg_provider = get_gpg_provider(self.host, client) |
1927 | 1907 |
1928 return gpg_provider.restore_secret_keys(gpg_provider.decrypt_symmetrically( | 1908 return gpg_provider.restore_secret_keys( |
1929 ciphertext, | 1909 gpg_provider.decrypt_symmetrically(ciphertext, backup_code) |
1930 backup_code | 1910 ) |
1931 )) | |
1932 | 1911 |
1933 @staticmethod | 1912 @staticmethod |
1934 def __get_joined_muc_users( | 1913 def __get_joined_muc_users( |
1935 client: SatXMPPClient, | 1914 client: SatXMPPClient, xep_0045: XEP_0045, room_jid: jid.JID |
1936 xep_0045: XEP_0045, | |
1937 room_jid: jid.JID | |
1938 ) -> Set[jid.JID]: | 1915 ) -> Set[jid.JID]: |
1939 """ | 1916 """ |
1940 @param client: The client. | 1917 @param client: The client. |
1941 @param xep_0045: A MUC plugin instance. | 1918 @param xep_0045: A MUC plugin instance. |
1942 @param room_jid: The room JID. | 1919 @param room_jid: The room JID. |
1966 bare_jids.add(entity.jid.userhostJID()) | 1943 bare_jids.add(entity.jid.userhostJID()) |
1967 | 1944 |
1968 return bare_jids | 1945 return bare_jids |
1969 | 1946 |
1970 async def get_trust( | 1947 async def get_trust( |
1971 self, | 1948 self, client: SatXMPPClient, public_key: GPGPublicKey, owner: jid.JID |
1972 client: SatXMPPClient, | |
1973 public_key: GPGPublicKey, | |
1974 owner: jid.JID | |
1975 ) -> TrustLevel: | 1949 ) -> TrustLevel: |
1976 """Query the trust level of a public key. | 1950 """Query the trust level of a public key. |
1977 | 1951 |
1978 @param client: The client to perform this operation under. | 1952 @param client: The client to perform this operation under. |
1979 @param public_key: The public key. | 1953 @param public_key: The public key. |
1991 async def set_trust( | 1965 async def set_trust( |
1992 self, | 1966 self, |
1993 client: SatXMPPClient, | 1967 client: SatXMPPClient, |
1994 public_key: GPGPublicKey, | 1968 public_key: GPGPublicKey, |
1995 owner: jid.JID, | 1969 owner: jid.JID, |
1996 trust_level: TrustLevel | 1970 trust_level: TrustLevel, |
1997 ) -> None: | 1971 ) -> None: |
1998 """Set the trust level of a public key. | 1972 """Set the trust level of a public key. |
1999 | 1973 |
2000 @param client: The client to perform this operation under. | 1974 @param client: The client to perform this operation under. |
2001 @param public_key: The public key. | 1975 @param public_key: The public key. |
2006 key = f"/trust/{owner.userhost()}/{public_key.fingerprint}" | 1980 key = f"/trust/{owner.userhost()}/{public_key.fingerprint}" |
2007 | 1981 |
2008 await self.__storage[client.profile].force(key, trust_level.name) | 1982 await self.__storage[client.profile].force(key, trust_level.name) |
2009 | 1983 |
2010 async def get_trust_ui( # pylint: disable=invalid-name | 1984 async def get_trust_ui( # pylint: disable=invalid-name |
2011 self, | 1985 self, client: SatXMPPClient, entity: jid.JID |
2012 client: SatXMPPClient, | |
2013 entity: jid.JID | |
2014 ) -> xml_tools.XMLUI: | 1986 ) -> xml_tools.XMLUI: |
2015 """ | 1987 """ |
2016 @param client: The client. | 1988 @param client: The client. |
2017 @param entity: The entity whose device trust levels to manage. | 1989 @param entity: The entity whose device trust levels to manage. |
2018 @return: An XMLUI instance which opens a form to manage the trust level of all | 1990 @return: An XMLUI instance which opens a form to manage the trust level of all |
2024 | 1996 |
2025 bare_jids: Set[jid.JID] | 1997 bare_jids: Set[jid.JID] |
2026 if self.__xep_0045 is not None and self.__xep_0045.is_joined_room(client, entity): | 1998 if self.__xep_0045 is not None and self.__xep_0045.is_joined_room(client, entity): |
2027 bare_jids = self.__get_joined_muc_users(client, self.__xep_0045, entity) | 1999 bare_jids = self.__get_joined_muc_users(client, self.__xep_0045, entity) |
2028 else: | 2000 else: |
2029 bare_jids = { entity.userhostJID() } | 2001 bare_jids = {entity.userhostJID()} |
2030 | 2002 |
2031 all_public_keys = list({ | 2003 all_public_keys = list( |
2032 bare_jid: list(self.list_public_keys(client, bare_jid)) | 2004 { |
2033 for bare_jid | 2005 bare_jid: list(self.list_public_keys(client, bare_jid)) |
2034 in bare_jids | 2006 for bare_jid in bare_jids |
2035 }.items()) | 2007 }.items() |
2008 ) | |
2036 | 2009 |
2037 async def callback( | 2010 async def callback( |
2038 data: Any, | 2011 data: Any, profile: str # pylint: disable=unused-argument |
2039 profile: str # pylint: disable=unused-argument | |
2040 ) -> Dict[Never, Never]: | 2012 ) -> Dict[Never, Never]: |
2041 """ | 2013 """ |
2042 @param data: The XMLUI result produces by the trust UI form. | 2014 @param data: The XMLUI result produces by the trust UI form. |
2043 @param profile: The profile. | 2015 @param profile: The profile. |
2044 @return: An empty dictionary. The type of the return value was chosen | 2016 @return: An empty dictionary. The type of the return value was chosen |
2047 | 2019 |
2048 if C.bool(data.get("cancelled", "false")): | 2020 if C.bool(data.get("cancelled", "false")): |
2049 return {} | 2021 return {} |
2050 | 2022 |
2051 data_form_result = cast( | 2023 data_form_result = cast( |
2052 Dict[str, str], | 2024 Dict[str, str], xml_tools.xmlui_result_2_data_form_result(data) |
2053 xml_tools.xmlui_result_2_data_form_result(data) | |
2054 ) | 2025 ) |
2055 for key, value in data_form_result.items(): | 2026 for key, value in data_form_result.items(): |
2056 if not key.startswith("trust_"): | 2027 if not key.startswith("trust_"): |
2057 continue | 2028 continue |
2058 | 2029 |
2068 return {} | 2039 return {} |
2069 | 2040 |
2070 submit_id = self.host.register_callback(callback, with_data=True, one_shot=True) | 2041 submit_id = self.host.register_callback(callback, with_data=True, one_shot=True) |
2071 | 2042 |
2072 result = xml_tools.XMLUI( | 2043 result = xml_tools.XMLUI( |
2073 panel_type=C.XMLUI_FORM, | 2044 panel_type=C.XMLUI_FORM, title=D_("OX trust management"), submit_id=submit_id |
2074 title=D_("OX trust management"), | |
2075 submit_id=submit_id | |
2076 ) | 2045 ) |
2077 # Casting this to Any, otherwise all calls on the variable cause type errors | 2046 # Casting this to Any, otherwise all calls on the variable cause type errors |
2078 # pylint: disable=no-member | 2047 # pylint: disable=no-member |
2079 trust_ui = cast(Any, result) | 2048 trust_ui = cast(Any, result) |
2080 trust_ui.addText(D_( | 2049 trust_ui.addText( |
2081 "This is OX trusting system. You'll see below the GPG keys of your " | 2050 D_( |
2082 "contacts, and a list selection to trust them or not. A trusted key " | 2051 "This is OX trusting system. You'll see below the GPG keys of your " |
2083 "can read your messages in plain text, so be sure to only validate " | 2052 "contacts, and a list selection to trust them or not. A trusted key " |
2084 "keys that you are sure are belonging to your contact. It's better " | 2053 "can read your messages in plain text, so be sure to only validate " |
2085 "to do this when you are next to your contact, so " | 2054 "keys that you are sure are belonging to your contact. It's better " |
2086 "you can check the \"fingerprint\" of the key " | 2055 "to do this when you are next to your contact, so " |
2087 "yourself. Do *not* validate a key if the fingerprint is wrong!" | 2056 'you can check the "fingerprint" of the key ' |
2088 )) | 2057 "yourself. Do *not* validate a key if the fingerprint is wrong!" |
2058 ) | |
2059 ) | |
2089 | 2060 |
2090 own_secret_keys = self.list_secret_keys(client) | 2061 own_secret_keys = self.list_secret_keys(client) |
2091 | 2062 |
2092 trust_ui.change_container("label") | 2063 trust_ui.change_container("label") |
2093 for index, secret_key in enumerate(own_secret_keys): | 2064 for index, secret_key in enumerate(own_secret_keys): |
2094 trust_ui.addLabel(D_(f"Own secret key {index} fingerprint")) | 2065 trust_ui.addLabel(D_(f"Own secret key {index} fingerprint")) |
2095 trust_ui.addText(secret_key.public_key.fingerprint) | 2066 trust_ui.addText(secret_key.public_key.fingerprint) |
2096 trust_ui.addEmpty() | 2067 trust_ui.addEmpty() |
2097 trust_ui.addEmpty() | 2068 trust_ui.addEmpty() |
2098 | 2069 |
2099 for outer_index, [ owner, public_keys ] in enumerate(all_public_keys): | 2070 for outer_index, [owner, public_keys] in enumerate(all_public_keys): |
2100 for inner_index, public_key in enumerate(public_keys): | 2071 for inner_index, public_key in enumerate(public_keys): |
2101 trust_ui.addLabel(D_("Contact")) | 2072 trust_ui.addLabel(D_("Contact")) |
2102 trust_ui.addJid(jid.JID(owner)) | 2073 trust_ui.addJid(jid.JID(owner)) |
2103 trust_ui.addLabel(D_("Fingerprint")) | 2074 trust_ui.addLabel(D_("Fingerprint")) |
2104 trust_ui.addText(public_key.fingerprint) | 2075 trust_ui.addText(public_key.fingerprint) |
2105 trust_ui.addLabel(D_("Trust this device?")) | 2076 trust_ui.addLabel(D_("Trust this device?")) |
2106 | 2077 |
2107 current_trust_level = await self.get_trust(client, public_key, owner) | 2078 current_trust_level = await self.get_trust(client, public_key, owner) |
2108 avaiable_trust_levels = \ | 2079 avaiable_trust_levels = { |
2109 { TrustLevel.DISTRUSTED, TrustLevel.TRUSTED, current_trust_level } | 2080 TrustLevel.DISTRUSTED, |
2081 TrustLevel.TRUSTED, | |
2082 current_trust_level, | |
2083 } | |
2110 | 2084 |
2111 trust_ui.addList( | 2085 trust_ui.addList( |
2112 f"trust_{outer_index}_{inner_index}", | 2086 f"trust_{outer_index}_{inner_index}", |
2113 options=[ trust_level.name for trust_level in avaiable_trust_levels ], | 2087 options=[trust_level.name for trust_level in avaiable_trust_levels], |
2114 selected=current_trust_level.name, | 2088 selected=current_trust_level.name, |
2115 styles=[ "inline" ] | 2089 styles=["inline"], |
2116 ) | 2090 ) |
2117 | 2091 |
2118 trust_ui.addEmpty() | 2092 trust_ui.addEmpty() |
2119 trust_ui.addEmpty() | 2093 trust_ui.addEmpty() |
2120 | 2094 |