comparison libervia/backend/plugins/plugin_xep_0373.py @ 4270:0d7bb4df2343

Reformatted code base using black.
author Goffi <goffi@goffi.org>
date Wed, 19 Jun 2024 18:44:57 +0200
parents b53b6dc1f929
children
comparison
equal deleted inserted replaced
4269:64a85ce8be70 4270:0d7bb4df2343
73 "GPGPublicKey", 73 "GPGPublicKey",
74 "GPGSecretKey", 74 "GPGSecretKey",
75 "GPGProvider", 75 "GPGProvider",
76 "PublicKeyMetadata", 76 "PublicKeyMetadata",
77 "gpg_provider", 77 "gpg_provider",
78 "TrustLevel" 78 "TrustLevel",
79 ] 79 ]
80 80
81 81
82 log = cast(Logger, getLogger(__name__)) # type: ignore[no-untyped-call] 82 log = cast(Logger, getLogger(__name__)) # type: ignore[no-untyped-call]
83 83
84 84
85 PLUGIN_INFO = { 85 PLUGIN_INFO = {
86 C.PI_NAME: "XEP-0373", 86 C.PI_NAME: "XEP-0373",
87 C.PI_IMPORT_NAME: "XEP-0373", 87 C.PI_IMPORT_NAME: "XEP-0373",
88 C.PI_TYPE: "SEC", 88 C.PI_TYPE: "SEC",
89 C.PI_PROTOCOLS: [ "XEP-0373" ], 89 C.PI_PROTOCOLS: ["XEP-0373"],
90 C.PI_DEPENDENCIES: [ "XEP-0060", "XEP-0163" ], 90 C.PI_DEPENDENCIES: ["XEP-0060", "XEP-0163"],
91 C.PI_RECOMMENDATIONS: [], 91 C.PI_RECOMMENDATIONS: [],
92 C.PI_MAIN: "XEP_0373", 92 C.PI_MAIN: "XEP_0373",
93 C.PI_HANDLER: "no", 93 C.PI_HANDLER: "no",
94 C.PI_DESCRIPTION: D_("Implementation of OpenPGP for XMPP"), 94 C.PI_DESCRIPTION: D_("Implementation of OpenPGP for XMPP"),
95 } 95 }
304 of the public keys is available. 304 of the public keys is available.
305 """ 305 """
306 306
307 @abstractmethod 307 @abstractmethod
308 def verify_detached( 308 def verify_detached(
309 self, 309 self, data: bytes, signature: bytes, public_keys: Set[GPGPublicKey]
310 data: bytes,
311 signature: bytes,
312 public_keys: Set[GPGPublicKey]
313 ) -> None: 310 ) -> None:
314 """Verify signed data, where the signature was created detached from the data. 311 """Verify signed data, where the signature was created detached from the data.
315 312
316 OpenPGP's ASCII Armor is not used. 313 OpenPGP's ASCII Armor is not used.
317 314
327 @abstractmethod 324 @abstractmethod
328 def encrypt( 325 def encrypt(
329 self, 326 self,
330 plaintext: bytes, 327 plaintext: bytes,
331 public_keys: Set[GPGPublicKey], 328 public_keys: Set[GPGPublicKey],
332 signing_keys: Optional[Set[GPGSecretKey]] = None 329 signing_keys: Optional[Set[GPGSecretKey]] = None,
333 ) -> bytes: 330 ) -> bytes:
334 """Encrypt and optionally sign some data. 331 """Encrypt and optionally sign some data.
335 332
336 OpenPGP's ASCII Armor is not used. 333 OpenPGP's ASCII Armor is not used.
337 334
344 @abstractmethod 341 @abstractmethod
345 def decrypt( 342 def decrypt(
346 self, 343 self,
347 ciphertext: bytes, 344 ciphertext: bytes,
348 secret_keys: Set[GPGSecretKey], 345 secret_keys: Set[GPGSecretKey],
349 public_keys: Optional[Set[GPGPublicKey]] = None 346 public_keys: Optional[Set[GPGPublicKey]] = None,
350 ) -> bytes: 347 ) -> bytes:
351 """Decrypt and optionally verify some data. 348 """Decrypt and optionally verify some data.
352 349
353 OpenPGP's ASCII Armor is not used. 350 OpenPGP's ASCII Armor is not used.
354 351
567 564
568 def decrypt_symmetrically(self, ciphertext: bytes, password: str) -> bytes: 565 def decrypt_symmetrically(self, ciphertext: bytes, password: str) -> bytes:
569 with gpg.Context(home_dir=self.__home_dir) as c: 566 with gpg.Context(home_dir=self.__home_dir) as c:
570 try: 567 try:
571 plaintext, __, __ = c.decrypt( 568 plaintext, __, __ = c.decrypt(
572 ciphertext, 569 ciphertext, passphrase=password, verify=False
573 passphrase=password,
574 verify=False
575 ) 570 )
576 except gpg.errors.GPGMEError as e: 571 except gpg.errors.GPGMEError as e:
577 # TODO: Find out what kind of error is raised if the password is wrong and 572 # TODO: Find out what kind of error is raised if the password is wrong and
578 # re-raise it as DecryptionFailed instead. 573 # re-raise it as DecryptionFailed instead.
579 raise GPGProviderError("Internal GPGME error") from e 574 raise GPGProviderError("Internal GPGME error") from e
644 ) 639 )
645 640
646 return data 641 return data
647 642
648 def verify_detached( 643 def verify_detached(
649 self, 644 self, data: bytes, signature: bytes, public_keys: Set[GPGPublicKey]
650 data: bytes,
651 signature: bytes,
652 public_keys: Set[GPGPublicKey]
653 ) -> None: 645 ) -> None:
654 with gpg.Context(home_dir=self.__home_dir) as c: 646 with gpg.Context(home_dir=self.__home_dir) as c:
655 try: 647 try:
656 __, result = c.verify(data, signature=signature) 648 __, result = c.verify(data, signature=signature)
657 except gpg.errors.GPGMEError as e: 649 except gpg.errors.GPGMEError as e:
675 667
676 def encrypt( 668 def encrypt(
677 self, 669 self,
678 plaintext: bytes, 670 plaintext: bytes,
679 public_keys: Set[GPGPublicKey], 671 public_keys: Set[GPGPublicKey],
680 signing_keys: Optional[Set[GPGSecretKey]] = None 672 signing_keys: Optional[Set[GPGSecretKey]] = None,
681 ) -> bytes: 673 ) -> bytes:
682 recipients = [] 674 recipients = []
683 for public_key in public_keys: 675 for public_key in public_keys:
684 assert isinstance(public_key, GPGME_GPGPublicKey) 676 assert isinstance(public_key, GPGME_GPGPublicKey)
685 677
699 ciphertext, __, __ = c.encrypt( 691 ciphertext, __, __ = c.encrypt(
700 plaintext, 692 plaintext,
701 recipients=recipients, 693 recipients=recipients,
702 sign=sign, 694 sign=sign,
703 always_trust=True, 695 always_trust=True,
704 add_encrypt_to=True 696 add_encrypt_to=True,
705 ) 697 )
706 except gpg.errors.GPGMEError as e: 698 except gpg.errors.GPGMEError as e:
707 raise GPGProviderError("Internal GPGME error") from e 699 raise GPGProviderError("Internal GPGME error") from e
708 except gpg.errors.InvalidRecipients as e: 700 except gpg.errors.InvalidRecipients as e:
709 raise GPGProviderError( 701 raise GPGProviderError(
718 710
719 def decrypt( 711 def decrypt(
720 self, 712 self,
721 ciphertext: bytes, 713 ciphertext: bytes,
722 secret_keys: Set[GPGSecretKey], 714 secret_keys: Set[GPGSecretKey],
723 public_keys: Optional[Set[GPGPublicKey]] = None 715 public_keys: Optional[Set[GPGPublicKey]] = None,
724 ) -> bytes: 716 ) -> bytes:
725 verify = public_keys is not None 717 verify = public_keys is not None
726 718
727 with gpg.Context(home_dir=self.__home_dir) as c: 719 with gpg.Context(home_dir=self.__home_dir) as c:
728 try: 720 try:
729 plaintext, result, verify_result = c.decrypt( 721 plaintext, result, verify_result = c.decrypt(ciphertext, verify=verify)
730 ciphertext,
731 verify=verify
732 )
733 except gpg.errors.GPGMEError as e: 722 except gpg.errors.GPGMEError as e:
734 raise GPGProviderError("Internal GPGME error") from e 723 raise GPGProviderError("Internal GPGME error") from e
735 except gpg.UnsupportedAlgorithm as e: 724 except gpg.UnsupportedAlgorithm as e:
736 raise DecryptionFailed("Unsupported algorithm") from e 725 raise DecryptionFailed("Unsupported algorithm") from e
737 726
758 def list_public_keys(self, user_id: str) -> Set[GPGPublicKey]: 747 def list_public_keys(self, user_id: str) -> Set[GPGPublicKey]:
759 with gpg.Context(home_dir=self.__home_dir) as c: 748 with gpg.Context(home_dir=self.__home_dir) as c:
760 try: 749 try:
761 return { 750 return {
762 GPGME_GPGPublicKey(key) 751 GPGME_GPGPublicKey(key)
763 for key 752 for key in c.keylist(pattern=user_id, secret=False)
764 in c.keylist(pattern=user_id, secret=False)
765 } 753 }
766 except gpg.errors.GPGMEError as e: 754 except gpg.errors.GPGMEError as e:
767 raise GPGProviderError("Internal GPGME error") from e 755 raise GPGProviderError("Internal GPGME error") from e
768 756
769 def list_secret_keys(self, user_id: str) -> Set[GPGSecretKey]: 757 def list_secret_keys(self, user_id: str) -> Set[GPGSecretKey]:
770 with gpg.Context(home_dir=self.__home_dir) as c: 758 with gpg.Context(home_dir=self.__home_dir) as c:
771 try: 759 try:
772 return { 760 return {
773 GPGME_GPGSecretKey(GPGME_GPGPublicKey(key)) 761 GPGME_GPGSecretKey(GPGME_GPGPublicKey(key))
774 for key 762 for key in c.keylist(pattern=user_id, secret=True)
775 in c.keylist(pattern=user_id, secret=True)
776 } 763 }
777 except gpg.errors.GPGMEError as e: 764 except gpg.errors.GPGMEError as e:
778 raise GPGProviderError("Internal GPGME error") from e 765 raise GPGProviderError("Internal GPGME error") from e
779 766
780 def can_sign(self, public_key: GPGPublicKey) -> bool: 767 def can_sign(self, public_key: GPGPublicKey) -> bool:
795 expires=False, 782 expires=False,
796 sign=True, 783 sign=True,
797 encrypt=True, 784 encrypt=True,
798 certify=False, 785 certify=False,
799 authenticate=False, 786 authenticate=False,
800 force=True 787 force=True,
801 ) 788 )
802 789
803 key_obj = c.get_key(result.fpr, secret=True) 790 key_obj = c.get_key(result.fpr, secret=True)
804 except gpg.errors.GPGMEError as e: 791 except gpg.errors.GPGMEError as e:
805 raise GPGProviderError("Internal GPGME error") from e 792 raise GPGProviderError("Internal GPGME error") from e
811 798
812 class PublicKeyMetadata(NamedTuple): 799 class PublicKeyMetadata(NamedTuple):
813 """ 800 """
814 Metadata about a published public key. 801 Metadata about a published public key.
815 """ 802 """
803
816 fingerprint: str 804 fingerprint: str
817 timestamp: datetime 805 timestamp: datetime
818 806
819 def to_dict(self) -> dict: 807 def to_dict(self) -> dict:
820 # Convert the instance to a dictionary and handle datetime serialization 808 # Convert the instance to a dictionary and handle datetime serialization
821 data = self._asdict() 809 data = self._asdict()
822 data['timestamp'] = self.timestamp.isoformat() 810 data["timestamp"] = self.timestamp.isoformat()
823 return data 811 return data
824 812
825 @staticmethod 813 @staticmethod
826 def from_dict(data: dict) -> 'PublicKeyMetadata': 814 def from_dict(data: dict) -> "PublicKeyMetadata":
827 # Load a serialised dictionary 815 # Load a serialised dictionary
828 data['timestamp'] = datetime.fromisoformat(data['timestamp']) 816 data["timestamp"] = datetime.fromisoformat(data["timestamp"])
829 return PublicKeyMetadata(**data) 817 return PublicKeyMetadata(**data)
830 818
831 819
832 @enum.unique 820 @enum.unique
833 class TrustLevel(enum.Enum): 821 class TrustLevel(enum.Enum):
839 BLINDLY_TRUSTED: str = "BLINDLY_TRUSTED" 827 BLINDLY_TRUSTED: str = "BLINDLY_TRUSTED"
840 UNDECIDED: str = "UNDECIDED" 828 UNDECIDED: str = "UNDECIDED"
841 DISTRUSTED: str = "DISTRUSTED" 829 DISTRUSTED: str = "DISTRUSTED"
842 830
843 831
844 OPENPGP_SCHEMA = xmlschema.XMLSchema("""<?xml version="1.0" encoding="utf8"?> 832 OPENPGP_SCHEMA = xmlschema.XMLSchema(
833 """<?xml version="1.0" encoding="utf8"?>
845 <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" 834 <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
846 targetNamespace="urn:xmpp:openpgp:0" 835 targetNamespace="urn:xmpp:openpgp:0"
847 xmlns="urn:xmpp:openpgp:0"> 836 xmlns="urn:xmpp:openpgp:0">
848 837
849 <xs:element name="openpgp" type="xs:base64Binary"/> 838 <xs:element name="openpgp" type="xs:base64Binary"/>
850 </xs:schema> 839 </xs:schema>
851 """) 840 """
841 )
852 842
853 843
854 # The following schema needs verion 1.1 of XML Schema, which is not supported by lxml. 844 # The following schema needs verion 1.1 of XML Schema, which is not supported by lxml.
855 # Luckily, xmlschema exists, which is a clean, well maintained, cross-platform 845 # Luckily, xmlschema exists, which is a clean, well maintained, cross-platform
856 # implementation of XML Schema, including version 1.1. 846 # implementation of XML Schema, including version 1.1.
857 CONTENT_SCHEMA = xmlschema.XMLSchema11("""<?xml version="1.1" encoding="utf8"?> 847 CONTENT_SCHEMA = xmlschema.XMLSchema11(
848 """<?xml version="1.1" encoding="utf8"?>
858 <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" 849 <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
859 targetNamespace="urn:xmpp:openpgp:0" 850 targetNamespace="urn:xmpp:openpgp:0"
860 xmlns="urn:xmpp:openpgp:0"> 851 xmlns="urn:xmpp:openpgp:0">
861 852
862 <xs:element name="signcrypt"> 853 <xs:element name="signcrypt">
912 <xs:any minOccurs="0" maxOccurs="unbounded" processContents="skip"/> 903 <xs:any minOccurs="0" maxOccurs="unbounded" processContents="skip"/>
913 </xs:sequence> 904 </xs:sequence>
914 </xs:complexType> 905 </xs:complexType>
915 </xs:element> 906 </xs:element>
916 </xs:schema> 907 </xs:schema>
917 """) 908 """
909 )
918 910
919 911
920 PUBLIC_KEYS_LIST_NODE = "urn:xmpp:openpgp:0:public-keys" 912 PUBLIC_KEYS_LIST_NODE = "urn:xmpp:openpgp:0:public-keys"
921 PUBLIC_KEYS_LIST_SCHEMA = xmlschema.XMLSchema("""<?xml version="1.0" encoding="utf8"?> 913 PUBLIC_KEYS_LIST_SCHEMA = xmlschema.XMLSchema(
914 """<?xml version="1.0" encoding="utf8"?>
922 <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" 915 <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
923 targetNamespace="urn:xmpp:openpgp:0" 916 targetNamespace="urn:xmpp:openpgp:0"
924 xmlns="urn:xmpp:openpgp:0"> 917 xmlns="urn:xmpp:openpgp:0">
925 918
926 <xs:element name="public-keys-list"> 919 <xs:element name="public-keys-list">
936 <xs:attribute name="v4-fingerprint" type="xs:string"/> 929 <xs:attribute name="v4-fingerprint" type="xs:string"/>
937 <xs:attribute name="date" type="xs:dateTime"/> 930 <xs:attribute name="date" type="xs:dateTime"/>
938 </xs:complexType> 931 </xs:complexType>
939 </xs:element> 932 </xs:element>
940 </xs:schema> 933 </xs:schema>
941 """) 934 """
942 935 )
943 936
944 PUBKEY_SCHEMA = xmlschema.XMLSchema("""<?xml version="1.0" encoding="utf8"?> 937
938 PUBKEY_SCHEMA = xmlschema.XMLSchema(
939 """<?xml version="1.0" encoding="utf8"?>
945 <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" 940 <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
946 targetNamespace="urn:xmpp:openpgp:0" 941 targetNamespace="urn:xmpp:openpgp:0"
947 xmlns="urn:xmpp:openpgp:0"> 942 xmlns="urn:xmpp:openpgp:0">
948 943
949 <xs:element name="pubkey"> 944 <xs:element name="pubkey">
955 </xs:complexType> 950 </xs:complexType>
956 </xs:element> 951 </xs:element>
957 952
958 <xs:element name="data" type="xs:base64Binary"/> 953 <xs:element name="data" type="xs:base64Binary"/>
959 </xs:schema> 954 </xs:schema>
960 """) 955 """
961 956 )
962 957
963 SECRETKEY_SCHEMA = xmlschema.XMLSchema("""<?xml version="1.0" encoding="utf8"?> 958
959 SECRETKEY_SCHEMA = xmlschema.XMLSchema(
960 """<?xml version="1.0" encoding="utf8"?>
964 <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" 961 <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
965 targetNamespace="urn:xmpp:openpgp:0" 962 targetNamespace="urn:xmpp:openpgp:0"
966 xmlns="urn:xmpp:openpgp:0"> 963 xmlns="urn:xmpp:openpgp:0">
967 964
968 <xs:element name="secretkey" type="xs:base64Binary"/> 965 <xs:element name="secretkey" type="xs:base64Binary"/>
969 </xs:schema> 966 </xs:schema>
970 """) 967 """
968 )
971 969
972 970
973 DEFAULT_TRUST_MODEL_PARAM = f""" 971 DEFAULT_TRUST_MODEL_PARAM = f"""
974 <params> 972 <params>
975 <individual> 973 <individual>
1003 """Generate a secure passphrase for symmetric encryption. 1001 """Generate a secure passphrase for symmetric encryption.
1004 1002
1005 @return: The passphrase. 1003 @return: The passphrase.
1006 """ 1004 """
1007 1005
1008 return "-".join("".join( 1006 return "-".join(
1009 secrets.choice("123456789ABCDEFGHIJKLMNPQRSTUVWXYZ") for __ in range(4) 1007 "".join(secrets.choice("123456789ABCDEFGHIJKLMNPQRSTUVWXYZ") for __ in range(4))
1010 ) for __ in range(6)) 1008 for __ in range(6)
1009 )
1011 1010
1012 1011
1013 # TODO: Handle the user id mess 1012 # TODO: Handle the user id mess
1014 class XEP_0373: 1013 class XEP_0373:
1015 """ 1014 """
1036 xep_0163.add_pep_event( 1035 xep_0163.add_pep_event(
1037 "OX_PUBLIC_KEYS_LIST", 1036 "OX_PUBLIC_KEYS_LIST",
1038 PUBLIC_KEYS_LIST_NODE, 1037 PUBLIC_KEYS_LIST_NODE,
1039 lambda items_event, profile: defer.ensureDeferred( 1038 lambda items_event, profile: defer.ensureDeferred(
1040 self.__on_public_keys_list_update(items_event, profile) 1039 self.__on_public_keys_list_update(items_event, profile)
1041 ) 1040 ),
1042 ) 1041 )
1043 1042
1044 async def profile_connecting(self, client): 1043 async def profile_connecting(self, client):
1045 client.gpg_provider = get_gpg_provider(self.host, client) 1044 client.gpg_provider = get_gpg_provider(self.host, client)
1046 1045
1047 async def profile_connected( # pylint: disable=invalid-name 1046 async def profile_connected( # pylint: disable=invalid-name
1048 self, 1047 self, client: SatXMPPClient
1049 client: SatXMPPClient
1050 ) -> None: 1048 ) -> None:
1051 """ 1049 """
1052 @param client: The client. 1050 @param client: The client.
1053 """ 1051 """
1054 1052
1055 profile = cast(str, client.profile) 1053 profile = cast(str, client.profile)
1056 1054
1057 if not profile in self.__storage: 1055 if not profile in self.__storage:
1058 self.__storage[profile] = \ 1056 self.__storage[profile] = persistent.LazyPersistentBinaryDict(
1059 persistent.LazyPersistentBinaryDict("XEP-0373", client.profile) 1057 "XEP-0373", client.profile
1058 )
1060 1059
1061 if len(self.list_secret_keys(client)) == 0: 1060 if len(self.list_secret_keys(client)) == 0:
1062 log.debug(f"Generating first GPG key for {client.jid.userhost()}.") 1061 log.debug(f"Generating first GPG key for {client.jid.userhost()}.")
1063 await self.create_key(client) 1062 await self.create_key(client)
1064 1063
1065 async def __on_public_keys_list_update( 1064 async def __on_public_keys_list_update(
1066 self, 1065 self, items_event: pubsub.ItemsEvent, profile: str
1067 items_event: pubsub.ItemsEvent,
1068 profile: str
1069 ) -> None: 1066 ) -> None:
1070 """Handle public keys list updates fired by PEP. 1067 """Handle public keys list updates fired by PEP.
1071 1068
1072 @param items_event: The event. 1069 @param items_event: The event.
1073 @param profile: The profile this event belongs to. 1070 @param profile: The profile this event belongs to.
1087 log.debug("Ignoring empty public keys list update.") 1084 log.debug("Ignoring empty public keys list update.")
1088 return 1085 return
1089 1086
1090 public_keys_list_elt = cast( 1087 public_keys_list_elt = cast(
1091 Optional[domish.Element], 1088 Optional[domish.Element],
1092 next(item_elt.elements(NS_OX, "public-keys-list"), None) 1089 next(item_elt.elements(NS_OX, "public-keys-list"), None),
1093 ) 1090 )
1094 1091
1095 pubkey_metadata_elts: Optional[List[domish.Element]] = None 1092 pubkey_metadata_elts: Optional[List[domish.Element]] = None
1096 1093
1097 if public_keys_list_elt is not None: 1094 if public_keys_list_elt is not None:
1098 try: 1095 try:
1099 PUBLIC_KEYS_LIST_SCHEMA.validate(public_keys_list_elt.toXml()) 1096 PUBLIC_KEYS_LIST_SCHEMA.validate(public_keys_list_elt.toXml())
1100 except xmlschema.XMLSchemaValidationError: 1097 except xmlschema.XMLSchemaValidationError:
1101 pass 1098 pass
1102 else: 1099 else:
1103 pubkey_metadata_elts = \ 1100 pubkey_metadata_elts = list(
1104 list(public_keys_list_elt.elements(NS_OX, "pubkey-metadata")) 1101 public_keys_list_elt.elements(NS_OX, "pubkey-metadata")
1102 )
1105 1103
1106 if pubkey_metadata_elts is None: 1104 if pubkey_metadata_elts is None:
1107 log.warning(f"Malformed public keys list update item: {item_elt.toXml()}") 1105 log.warning(f"Malformed public keys list update item: {item_elt.toXml()}")
1108 return 1106 return
1109 1107
1110 new_public_keys_metadata = { PublicKeyMetadata( 1108 new_public_keys_metadata = {
1111 fingerprint=cast(str, pubkey_metadata_elt["v4-fingerprint"]), 1109 PublicKeyMetadata(
1112 timestamp=parse_datetime(cast(str, pubkey_metadata_elt["date"])) 1110 fingerprint=cast(str, pubkey_metadata_elt["v4-fingerprint"]),
1113 ) for pubkey_metadata_elt in pubkey_metadata_elts } 1111 timestamp=parse_datetime(cast(str, pubkey_metadata_elt["date"])),
1112 )
1113 for pubkey_metadata_elt in pubkey_metadata_elts
1114 }
1114 1115
1115 storage_key = STR_KEY_PUBLIC_KEYS_METADATA.format(sender.userhost()) 1116 storage_key = STR_KEY_PUBLIC_KEYS_METADATA.format(sender.userhost())
1116 1117
1117 local_public_keys_metadata = { 1118 local_public_keys_metadata = {
1118 PublicKeyMetadata.from_dict(pkm) 1119 PublicKeyMetadata.from_dict(pkm)
1138 1139
1139 # Check whether this update was for our account and make sure all of our keys are 1140 # Check whether this update was for our account and make sure all of our keys are
1140 # included in the update 1141 # included in the update
1141 if sender.userhost() == client.jid.userhost(): 1142 if sender.userhost() == client.jid.userhost():
1142 secret_keys = self.list_secret_keys(client) 1143 secret_keys = self.list_secret_keys(client)
1143 missing_keys = set(filter(lambda secret_key: all( 1144 missing_keys = set(
1144 key_metadata.fingerprint != secret_key.public_key.fingerprint 1145 filter(
1145 for key_metadata 1146 lambda secret_key: all(
1146 in new_public_keys_metadata 1147 key_metadata.fingerprint != secret_key.public_key.fingerprint
1147 ), secret_keys)) 1148 for key_metadata in new_public_keys_metadata
1149 ),
1150 secret_keys,
1151 )
1152 )
1148 1153
1149 if len(missing_keys) > 0: 1154 if len(missing_keys) > 0:
1150 log.warning( 1155 log.warning(
1151 "Public keys list update did not contain at least one of our keys." 1156 "Public keys list update did not contain at least one of our keys."
1152 f" {new_public_keys_metadata}" 1157 f" {new_public_keys_metadata}"
1153 ) 1158 )
1154 1159
1155 for missing_key in missing_keys: 1160 for missing_key in missing_keys:
1156 log.warning(missing_key.public_key.fingerprint) 1161 log.warning(missing_key.public_key.fingerprint)
1157 new_public_keys_metadata.add(PublicKeyMetadata( 1162 new_public_keys_metadata.add(
1158 fingerprint=missing_key.public_key.fingerprint, 1163 PublicKeyMetadata(
1159 timestamp=datetime.now(timezone.utc) 1164 fingerprint=missing_key.public_key.fingerprint,
1160 )) 1165 timestamp=datetime.now(timezone.utc),
1166 )
1167 )
1161 1168
1162 await self.publish_public_keys_list(client, new_public_keys_metadata) 1169 await self.publish_public_keys_list(client, new_public_keys_metadata)
1163 1170
1164 await self.__storage[profile].force( 1171 await self.__storage[profile].force(
1165 storage_key, 1172 storage_key, [pkm.to_dict() for pkm in new_public_keys_metadata]
1166 [pkm.to_dict() for pkm in new_public_keys_metadata]
1167 ) 1173 )
1168 1174
1169 def list_public_keys(self, client: SatXMPPClient, jid: jid.JID) -> Set[GPGPublicKey]: 1175 def list_public_keys(self, client: SatXMPPClient, jid: jid.JID) -> Set[GPGPublicKey]:
1170 """List GPG public keys available for a JID. 1176 """List GPG public keys available for a JID.
1171 1177
1209 public_keys_list = { 1215 public_keys_list = {
1210 PublicKeyMetadata.from_dict(pkm) 1216 PublicKeyMetadata.from_dict(pkm)
1211 for pkm in await self.__storage[client.profile].get(storage_key, []) 1217 for pkm in await self.__storage[client.profile].get(storage_key, [])
1212 } 1218 }
1213 1219
1214 public_keys_list.add(PublicKeyMetadata( 1220 public_keys_list.add(
1215 fingerprint=secret_key.public_key.fingerprint, 1221 PublicKeyMetadata(
1216 timestamp=datetime.now(timezone.utc) 1222 fingerprint=secret_key.public_key.fingerprint,
1217 )) 1223 timestamp=datetime.now(timezone.utc),
1224 )
1225 )
1218 1226
1219 await self.publish_public_keys_list(client, public_keys_list) 1227 await self.publish_public_keys_list(client, public_keys_list)
1220 1228
1221 await self.__storage[client.profile].force( 1229 await self.__storage[client.profile].force(
1222 storage_key, 1230 storage_key, [pkm.to_dict() for pkm in public_keys_list]
1223 [pkm.to_dict() for pkm in public_keys_list]
1224 ) 1231 )
1225 1232
1226 return secret_key 1233 return secret_key
1227 1234
1228 @staticmethod 1235 @staticmethod
1229 def __build_content_element( 1236 def __build_content_element(
1230 element_name: Literal["signcrypt", "sign", "crypt"], 1237 element_name: Literal["signcrypt", "sign", "crypt"],
1231 recipient_jids: Iterable[jid.JID], 1238 recipient_jids: Iterable[jid.JID],
1232 include_rpad: bool 1239 include_rpad: bool,
1233 ) -> Tuple[domish.Element, domish.Element]: 1240 ) -> Tuple[domish.Element, domish.Element]:
1234 """Build a content element. 1241 """Build a content element.
1235 1242
1236 @param element_name: The name of the content element. 1243 @param element_name: The name of the content element.
1237 @param recipient_jids: The intended recipients of this content element. Can be 1244 @param recipient_jids: The intended recipients of this content element. Can be
1252 # XEP-0373 doesn't specify bounds for the length of the random padding. This 1259 # XEP-0373 doesn't specify bounds for the length of the random padding. This
1253 # uses the bounds specified in XEP-0420 for the closely related rpad affix. 1260 # uses the bounds specified in XEP-0420 for the closely related rpad affix.
1254 rpad_length = secrets.randbelow(201) 1261 rpad_length = secrets.randbelow(201)
1255 rpad_content = "".join( 1262 rpad_content = "".join(
1256 secrets.choice(string.digits + string.ascii_letters + string.punctuation) 1263 secrets.choice(string.digits + string.ascii_letters + string.punctuation)
1257 for __ 1264 for __ in range(rpad_length)
1258 in range(rpad_length)
1259 ) 1265 )
1260 content_elt.addElement("rpad", content=rpad_content) 1266 content_elt.addElement("rpad", content=rpad_content)
1261 1267
1262 payload_elt = content_elt.addElement("payload") 1268 payload_elt = content_elt.addElement("payload")
1263 1269
1264 return content_elt, payload_elt 1270 return content_elt, payload_elt
1265 1271
1266 @staticmethod 1272 @staticmethod
1267 def build_signcrypt_element( 1273 def build_signcrypt_element(
1268 recipient_jids: Iterable[jid.JID] 1274 recipient_jids: Iterable[jid.JID],
1269 ) -> Tuple[domish.Element, domish.Element]: 1275 ) -> Tuple[domish.Element, domish.Element]:
1270 """Build a ``<signcrypt/>`` content element. 1276 """Build a ``<signcrypt/>`` content element.
1271 1277
1272 @param recipient_jids: The intended recipients of this content element. Can be 1278 @param recipient_jids: The intended recipients of this content element. Can be
1273 bare JIDs. 1279 bare JIDs.
1280 1286
1281 return XEP_0373.__build_content_element("signcrypt", recipient_jids, True) 1287 return XEP_0373.__build_content_element("signcrypt", recipient_jids, True)
1282 1288
1283 @staticmethod 1289 @staticmethod
1284 def build_sign_element( 1290 def build_sign_element(
1285 recipient_jids: Iterable[jid.JID], 1291 recipient_jids: Iterable[jid.JID], include_rpad: bool
1286 include_rpad: bool
1287 ) -> Tuple[domish.Element, domish.Element]: 1292 ) -> Tuple[domish.Element, domish.Element]:
1288 """Build a ``<sign/>`` content element. 1293 """Build a ``<sign/>`` content element.
1289 1294
1290 @param recipient_jids: The intended recipients of this content element. Can be 1295 @param recipient_jids: The intended recipients of this content element. Can be
1291 bare JIDs. 1296 bare JIDs.
1300 1305
1301 return XEP_0373.__build_content_element("sign", recipient_jids, include_rpad) 1306 return XEP_0373.__build_content_element("sign", recipient_jids, include_rpad)
1302 1307
1303 @staticmethod 1308 @staticmethod
1304 def build_crypt_element( 1309 def build_crypt_element(
1305 recipient_jids: Iterable[jid.JID] 1310 recipient_jids: Iterable[jid.JID],
1306 ) -> Tuple[domish.Element, domish.Element]: 1311 ) -> Tuple[domish.Element, domish.Element]:
1307 """Build a ``<crypt/>`` content element. 1312 """Build a ``<crypt/>`` content element.
1308 1313
1309 @param recipient_jids: The intended recipients of this content element. Specifying 1314 @param recipient_jids: The intended recipients of this content element. Specifying
1310 the intended recipients is OPTIONAL for the ``<crypt/>`` content element. Can 1315 the intended recipients is OPTIONAL for the ``<crypt/>`` content element. Can
1317 1322
1318 async def build_openpgp_element( 1323 async def build_openpgp_element(
1319 self, 1324 self,
1320 client: SatXMPPClient, 1325 client: SatXMPPClient,
1321 content_elt: domish.Element, 1326 content_elt: domish.Element,
1322 recipient_jids: Set[jid.JID] 1327 recipient_jids: Set[jid.JID],
1323 ) -> domish.Element: 1328 ) -> domish.Element:
1324 """Build an ``<openpgp/>`` element. 1329 """Build an ``<openpgp/>`` element.
1325 1330
1326 @param client: The client to perform this operation with. 1331 @param client: The client to perform this operation with.
1327 @param content_elt: The content element to contain in the ``<openpgp/>`` element. 1332 @param content_elt: The content element to contain in the ``<openpgp/>`` element.
1331 1336
1332 gpg_provider = get_gpg_provider(self.host, client) 1337 gpg_provider = get_gpg_provider(self.host, client)
1333 1338
1334 # TODO: I'm not sure whether we want to sign with all keys by default or choose 1339 # TODO: I'm not sure whether we want to sign with all keys by default or choose
1335 # just one key/a subset of keys to sign with. 1340 # just one key/a subset of keys to sign with.
1336 signing_keys = set(filter( 1341 signing_keys = set(
1337 lambda secret_key: gpg_provider.can_sign(secret_key.public_key), 1342 filter(
1338 self.list_secret_keys(client) 1343 lambda secret_key: gpg_provider.can_sign(secret_key.public_key),
1339 )) 1344 self.list_secret_keys(client),
1345 )
1346 )
1340 1347
1341 encryption_keys: Set[GPGPublicKey] = set() 1348 encryption_keys: Set[GPGPublicKey] = set()
1342 1349
1343 for recipient_jid in recipient_jids: 1350 for recipient_jid in recipient_jids:
1344 # import all keys of the recipient 1351 # import all keys of the recipient
1368 async def unpack_openpgp_element( 1375 async def unpack_openpgp_element(
1369 self, 1376 self,
1370 client: SatXMPPClient, 1377 client: SatXMPPClient,
1371 openpgp_elt: domish.Element, 1378 openpgp_elt: domish.Element,
1372 element_name: Literal["signcrypt", "sign", "crypt"], 1379 element_name: Literal["signcrypt", "sign", "crypt"],
1373 sender_jid: jid.JID 1380 sender_jid: jid.JID,
1374 ) -> Tuple[domish.Element, datetime]: 1381 ) -> Tuple[domish.Element, datetime]:
1375 """Verify, decrypt and unpack an ``<openpgp/>`` element. 1382 """Verify, decrypt and unpack an ``<openpgp/>`` element.
1376 1383
1377 @param client: The client to perform this operation with. 1384 @param client: The client to perform this operation with.
1378 @param openpgp_elt: The ``<openpgp/>`` element. 1385 @param openpgp_elt: The ``<openpgp/>`` element.
1390 the calling code. 1397 the calling code.
1391 """ 1398 """
1392 1399
1393 gpg_provider = get_gpg_provider(self.host, client) 1400 gpg_provider = get_gpg_provider(self.host, client)
1394 1401
1395 decryption_keys = set(filter( 1402 decryption_keys = set(
1396 lambda secret_key: gpg_provider.can_encrypt(secret_key.public_key), 1403 filter(
1397 self.list_secret_keys(client) 1404 lambda secret_key: gpg_provider.can_encrypt(secret_key.public_key),
1398 )) 1405 self.list_secret_keys(client),
1406 )
1407 )
1399 1408
1400 # import all keys of the sender 1409 # import all keys of the sender
1401 all_public_keys = await self.import_all_public_keys(client, sender_jid) 1410 all_public_keys = await self.import_all_public_keys(client, sender_jid)
1402 1411
1403 # Filter for keys that can sign 1412 # Filter for keys that can sign
1415 openpgp_message = base64.b64decode(str(openpgp_elt)) 1424 openpgp_message = base64.b64decode(str(openpgp_elt))
1416 content: bytes 1425 content: bytes
1417 1426
1418 if element_name == "signcrypt": 1427 if element_name == "signcrypt":
1419 content = gpg_provider.decrypt( 1428 content = gpg_provider.decrypt(
1420 openpgp_message, 1429 openpgp_message, decryption_keys, public_keys=verification_keys
1421 decryption_keys,
1422 public_keys=verification_keys
1423 ) 1430 )
1424 elif element_name == "sign": 1431 elif element_name == "sign":
1425 content = gpg_provider.verify(openpgp_message, verification_keys) 1432 content = gpg_provider.verify(openpgp_message, verification_keys)
1426 elif element_name == "crypt": 1433 elif element_name == "crypt":
1427 content = gpg_provider.decrypt(openpgp_message, decryption_keys) 1434 content = gpg_provider.decrypt(openpgp_message, decryption_keys)
1428 else: 1435 else:
1429 assert_never(element_name) 1436 assert_never(element_name)
1430 1437
1431 try: 1438 try:
1432 content_elt = cast( 1439 content_elt = cast(
1433 domish.Element, 1440 domish.Element, xml_tools.ElementParser()(content.decode("utf-8"))
1434 xml_tools.ElementParser()(content.decode("utf-8"))
1435 ) 1441 )
1436 except UnicodeDecodeError as e: 1442 except UnicodeDecodeError as e:
1437 raise exceptions.ParsingError("UTF-8 decoding error") from e 1443 raise exceptions.ParsingError("UTF-8 decoding error") from e
1438 1444
1439 try: 1445 try:
1444 ) from e 1450 ) from e
1445 1451
1446 if content_elt.name != element_name: 1452 if content_elt.name != element_name:
1447 raise exceptions.ParsingError(f"Not a <{element_name}/> element.") 1453 raise exceptions.ParsingError(f"Not a <{element_name}/> element.")
1448 1454
1449 recipient_jids = \ 1455 recipient_jids = {
1450 { jid.JID(to_elt["jid"]) for to_elt in content_elt.elements(NS_OX, "to") } 1456 jid.JID(to_elt["jid"]) for to_elt in content_elt.elements(NS_OX, "to")
1457 }
1451 1458
1452 if ( 1459 if (
1453 client.jid.userhostJID() not in { jid.userhostJID() for jid in recipient_jids } 1460 client.jid.userhostJID() not in {jid.userhostJID() for jid in recipient_jids}
1454 and element_name != "crypt" 1461 and element_name != "crypt"
1455 ): 1462 ):
1456 raise VerificationError( 1463 raise VerificationError(
1457 f"Recipient list in <{element_name}/> element does not list our (bare)" 1464 f"Recipient list in <{element_name}/> element does not list our (bare)"
1458 f" JID." 1465 f" JID."
1465 payload_elt = next(content_elt.elements(NS_OX, "payload")) 1472 payload_elt = next(content_elt.elements(NS_OX, "payload"))
1466 1473
1467 return payload_elt, timestamp 1474 return payload_elt, timestamp
1468 1475
1469 async def publish_public_key( 1476 async def publish_public_key(
1470 self, 1477 self, client: SatXMPPClient, public_key: GPGPublicKey
1471 client: SatXMPPClient,
1472 public_key: GPGPublicKey
1473 ) -> None: 1478 ) -> None:
1474 """Publish a public key. 1479 """Publish a public key.
1475 1480
1476 @param client: The client. 1481 @param client: The client.
1477 @param public_key: The public key to publish. 1482 @param public_key: The public key to publish.
1497 format_datetime(), 1502 format_datetime(),
1498 extra={ 1503 extra={
1499 XEP_0060.EXTRA_PUBLISH_OPTIONS: { 1504 XEP_0060.EXTRA_PUBLISH_OPTIONS: {
1500 XEP_0060.OPT_PERSIST_ITEMS: "true", 1505 XEP_0060.OPT_PERSIST_ITEMS: "true",
1501 XEP_0060.OPT_ACCESS_MODEL: "open", 1506 XEP_0060.OPT_ACCESS_MODEL: "open",
1502 XEP_0060.OPT_MAX_ITEMS: 1 1507 XEP_0060.OPT_MAX_ITEMS: 1,
1503 }, 1508 },
1504 # TODO: Do we really want publish_without_options here? 1509 # TODO: Do we really want publish_without_options here?
1505 XEP_0060.EXTRA_ON_PRECOND_NOT_MET: "publish_without_options" 1510 XEP_0060.EXTRA_ON_PRECOND_NOT_MET: "publish_without_options",
1506 } 1511 },
1507 ) 1512 )
1508 except Exception as e: 1513 except Exception as e:
1509 raise XMPPInteractionFailed("Publishing the public key failed.") from e 1514 raise XMPPInteractionFailed("Publishing the public key failed.") from e
1510 1515
1511 async def import_all_public_keys( 1516 async def import_all_public_keys(
1512 self, 1517 self, client: SatXMPPClient, entity_jid: jid.JID
1513 client: SatXMPPClient,
1514 entity_jid: jid.JID
1515 ) -> Set[GPGPublicKey]: 1518 ) -> Set[GPGPublicKey]:
1516 """import all public keys of a JID that have not been imported before. 1519 """import all public keys of a JID that have not been imported before.
1517 1520
1518 @param client: The client. 1521 @param client: The client.
1519 @param jid: The JID. Can be a bare JID. 1522 @param jid: The JID. Can be a bare JID.
1533 if not public_keys_metadata: 1536 if not public_keys_metadata:
1534 public_keys_metadata = await self.download_public_keys_list( 1537 public_keys_metadata = await self.download_public_keys_list(
1535 client, entity_jid 1538 client, entity_jid
1536 ) 1539 )
1537 if not public_keys_metadata: 1540 if not public_keys_metadata:
1538 raise exceptions.NotFound( 1541 raise exceptions.NotFound(f"Can't find public keys for {entity_jid}")
1539 f"Can't find public keys for {entity_jid}"
1540 )
1541 else: 1542 else:
1542 await self.__storage[client.profile].aset( 1543 await self.__storage[client.profile].aset(
1543 storage_key, 1544 storage_key, [pkm.to_dict() for pkm in public_keys_metadata]
1544 [pkm.to_dict() for pkm in public_keys_metadata]
1545 ) 1545 )
1546 1546
1547 1547 missing_keys = set(
1548 missing_keys = set(filter(lambda public_key_metadata: all( 1548 filter(
1549 public_key_metadata.fingerprint != public_key.fingerprint 1549 lambda public_key_metadata: all(
1550 for public_key 1550 public_key_metadata.fingerprint != public_key.fingerprint
1551 in available_public_keys 1551 for public_key in available_public_keys
1552 ), public_keys_metadata)) 1552 ),
1553 public_keys_metadata,
1554 )
1555 )
1553 1556
1554 for missing_key in missing_keys: 1557 for missing_key in missing_keys:
1555 try: 1558 try:
1556 available_public_keys.add( 1559 available_public_keys.add(
1557 await self.import_public_key(client, entity_jid, missing_key.fingerprint) 1560 await self.import_public_key(
1561 client, entity_jid, missing_key.fingerprint
1562 )
1558 ) 1563 )
1559 except Exception as e: 1564 except Exception as e:
1560 log.warning( 1565 log.warning(
1561 f"import of public key {missing_key.fingerprint} owned by" 1566 f"import of public key {missing_key.fingerprint} owned by"
1562 f" {entity_jid.userhost()} failed, ignoring: {e}" 1567 f" {entity_jid.userhost()} failed, ignoring: {e}"
1563 ) 1568 )
1564 1569
1565 return available_public_keys 1570 return available_public_keys
1566 1571
1567 async def import_public_key( 1572 async def import_public_key(
1568 self, 1573 self, client: SatXMPPClient, jid: jid.JID, fingerprint: str
1569 client: SatXMPPClient,
1570 jid: jid.JID,
1571 fingerprint: str
1572 ) -> GPGPublicKey: 1574 ) -> GPGPublicKey:
1573 """import a public key. 1575 """import a public key.
1574 1576
1575 @param client: The client. 1577 @param client: The client.
1576 @param jid: The JID owning the public key. Can be a bare JID. 1578 @param jid: The JID owning the public key. Can be a bare JID.
1587 1589
1588 node = f"urn:xmpp:openpgp:0:public-keys:{fingerprint}" 1590 node = f"urn:xmpp:openpgp:0:public-keys:{fingerprint}"
1589 1591
1590 try: 1592 try:
1591 items, __ = await self.__xep_0060.get_items( 1593 items, __ = await self.__xep_0060.get_items(
1592 client, 1594 client, jid.userhostJID(), node, max_items=1
1593 jid.userhostJID(),
1594 node,
1595 max_items=1
1596 ) 1595 )
1597 except exceptions.NotFound as e: 1596 except exceptions.NotFound as e:
1598 raise exceptions.NotFound( 1597 raise exceptions.NotFound(
1599 f"No public key with fingerprint {fingerprint} published by JID" 1598 f"No public key with fingerprint {fingerprint} published by JID"
1600 f" {jid.userhost()}." 1599 f" {jid.userhost()}."
1609 f"No public key with fingerprint {fingerprint} published by JID" 1608 f"No public key with fingerprint {fingerprint} published by JID"
1610 f" {jid.userhost()}." 1609 f" {jid.userhost()}."
1611 ) from e 1610 ) from e
1612 1611
1613 pubkey_elt = cast( 1612 pubkey_elt = cast(
1614 Optional[domish.Element], 1613 Optional[domish.Element], next(item_elt.elements(NS_OX, "pubkey"), None)
1615 next(item_elt.elements(NS_OX, "pubkey"), None)
1616 ) 1614 )
1617 1615
1618 if pubkey_elt is None: 1616 if pubkey_elt is None:
1619 raise exceptions.ParsingError( 1617 raise exceptions.ParsingError(
1620 f"Publish-Subscribe item of JID {jid.userhost()} doesn't contain pubkey" 1618 f"Publish-Subscribe item of JID {jid.userhost()} doesn't contain pubkey"
1627 raise exceptions.ParsingError( 1625 raise exceptions.ParsingError(
1628 f"Publish-Subscribe item of JID {jid.userhost()} doesn't pass pubkey" 1626 f"Publish-Subscribe item of JID {jid.userhost()} doesn't pass pubkey"
1629 f" schema validation." 1627 f" schema validation."
1630 ) from e 1628 ) from e
1631 1629
1632 public_key = gpg_provider.import_public_key(base64.b64decode(str( 1630 public_key = gpg_provider.import_public_key(
1633 next(pubkey_elt.elements(NS_OX, "data")) 1631 base64.b64decode(str(next(pubkey_elt.elements(NS_OX, "data"))))
1634 ))) 1632 )
1635 1633
1636 return public_key 1634 return public_key
1637 1635
1638 async def publish_public_keys_list( 1636 async def publish_public_keys_list(
1639 self, 1637 self, client: SatXMPPClient, public_keys_list: Iterable[PublicKeyMetadata]
1640 client: SatXMPPClient,
1641 public_keys_list: Iterable[PublicKeyMetadata]
1642 ) -> None: 1638 ) -> None:
1643 """Publish/update the own public keys list. 1639 """Publish/update the own public keys list.
1644 1640
1645 @param client: The client. 1641 @param client: The client.
1646 @param public_keys_list: The public keys list. 1642 @param public_keys_list: The public keys list.
1648 1644
1649 @warning: All public keys referenced in the public keys list MUST be published 1645 @warning: All public keys referenced in the public keys list MUST be published
1650 beforehand. 1646 beforehand.
1651 """ 1647 """
1652 1648
1653 if len({ pkm.fingerprint for pkm in public_keys_list }) != len(public_keys_list): 1649 if len({pkm.fingerprint for pkm in public_keys_list}) != len(public_keys_list):
1654 raise ValueError("Public keys list contains duplicate fingerprints.") 1650 raise ValueError("Public keys list contains duplicate fingerprints.")
1655 1651
1656 node = "urn:xmpp:openpgp:0:public-keys" 1652 node = "urn:xmpp:openpgp:0:public-keys"
1657 1653
1658 public_keys_list_elt = domish.Element((NS_OX, "public-keys-list")) 1654 public_keys_list_elt = domish.Element((NS_OX, "public-keys-list"))
1671 item_id=XEP_0060.ID_SINGLETON, 1667 item_id=XEP_0060.ID_SINGLETON,
1672 extra={ 1668 extra={
1673 XEP_0060.EXTRA_PUBLISH_OPTIONS: { 1669 XEP_0060.EXTRA_PUBLISH_OPTIONS: {
1674 XEP_0060.OPT_PERSIST_ITEMS: "true", 1670 XEP_0060.OPT_PERSIST_ITEMS: "true",
1675 XEP_0060.OPT_ACCESS_MODEL: "open", 1671 XEP_0060.OPT_ACCESS_MODEL: "open",
1676 XEP_0060.OPT_MAX_ITEMS: 1 1672 XEP_0060.OPT_MAX_ITEMS: 1,
1677 }, 1673 },
1678 # TODO: Do we really want publish_without_options here? 1674 # TODO: Do we really want publish_without_options here?
1679 XEP_0060.EXTRA_ON_PRECOND_NOT_MET: "publish_without_options" 1675 XEP_0060.EXTRA_ON_PRECOND_NOT_MET: "publish_without_options",
1680 } 1676 },
1681 ) 1677 )
1682 except Exception as e: 1678 except Exception as e:
1683 raise XMPPInteractionFailed("Publishing the public keys list failed.") from e 1679 raise XMPPInteractionFailed("Publishing the public keys list failed.") from e
1684 1680
1685 async def download_public_keys_list( 1681 async def download_public_keys_list(
1686 self, 1682 self, client: SatXMPPClient, jid: jid.JID
1687 client: SatXMPPClient,
1688 jid: jid.JID
1689 ) -> Optional[Set[PublicKeyMetadata]]: 1683 ) -> Optional[Set[PublicKeyMetadata]]:
1690 """Download the public keys list of a JID. 1684 """Download the public keys list of a JID.
1691 1685
1692 @param client: The client. 1686 @param client: The client.
1693 @param jid: The JID. Can be a bare JID. 1687 @param jid: The JID. Can be a bare JID.
1699 1693
1700 node = "urn:xmpp:openpgp:0:public-keys" 1694 node = "urn:xmpp:openpgp:0:public-keys"
1701 1695
1702 try: 1696 try:
1703 items, __ = await self.__xep_0060.get_items( 1697 items, __ = await self.__xep_0060.get_items(
1704 client, 1698 client, jid.userhostJID(), node, max_items=1
1705 jid.userhostJID(),
1706 node,
1707 max_items=1
1708 ) 1699 )
1709 except exceptions.NotFound: 1700 except exceptions.NotFound:
1710 return None 1701 return None
1711 except Exception as e: 1702 except Exception as e:
1712 raise XMPPInteractionFailed() from e 1703 raise XMPPInteractionFailed() from e
1716 except IndexError: 1707 except IndexError:
1717 return None 1708 return None
1718 1709
1719 public_keys_list_elt = cast( 1710 public_keys_list_elt = cast(
1720 Optional[domish.Element], 1711 Optional[domish.Element],
1721 next(item_elt.elements(NS_OX, "public-keys-list"), None) 1712 next(item_elt.elements(NS_OX, "public-keys-list"), None),
1722 ) 1713 )
1723 1714
1724 if public_keys_list_elt is None: 1715 if public_keys_list_elt is None:
1725 return None 1716 return None
1726 1717
1733 ) from e 1724 ) from e
1734 1725
1735 return { 1726 return {
1736 PublicKeyMetadata( 1727 PublicKeyMetadata(
1737 fingerprint=pubkey_metadata_elt["v4-fingerprint"], 1728 fingerprint=pubkey_metadata_elt["v4-fingerprint"],
1738 timestamp=parse_datetime(pubkey_metadata_elt["date"]) 1729 timestamp=parse_datetime(pubkey_metadata_elt["date"]),
1739 ) 1730 )
1740 for pubkey_metadata_elt 1731 for pubkey_metadata_elt in public_keys_list_elt.elements(
1741 in public_keys_list_elt.elements(NS_OX, "pubkey-metadata") 1732 NS_OX, "pubkey-metadata"
1733 )
1742 } 1734 }
1743 1735
1744 async def __prepare_secret_key_synchronization( 1736 async def __prepare_secret_key_synchronization(
1745 self, 1737 self, client: SatXMPPClient
1746 client: SatXMPPClient
1747 ) -> Optional[domish.Element]: 1738 ) -> Optional[domish.Element]:
1748 """Prepare for secret key synchronization. 1739 """Prepare for secret key synchronization.
1749 1740
1750 Makes sure the relative protocols and protocol extensions are supported by the 1741 Makes sure the relative protocols and protocol extensions are supported by the
1751 server and makes sure that the PEP node for secret synchronization exists and is 1742 server and makes sure that the PEP node for secret synchronization exists and is
1758 protocols or protocol extensions. 1749 protocols or protocol extensions.
1759 @raise XMPPInteractionFailed: if any interaction via XMPP failed. 1750 @raise XMPPInteractionFailed: if any interaction via XMPP failed.
1760 """ 1751 """
1761 1752
1762 try: 1753 try:
1763 infos = cast(DiscoInfo, await self.host.memory.disco.get_infos( 1754 infos = cast(
1764 client, 1755 DiscoInfo,
1765 client.jid.userhostJID() 1756 await self.host.memory.disco.get_infos(client, client.jid.userhostJID()),
1766 )) 1757 )
1767 except Exception as e: 1758 except Exception as e:
1768 raise XMPPInteractionFailed( 1759 raise XMPPInteractionFailed(
1769 "Error performing service discovery on the own bare JID." 1760 "Error performing service discovery on the own bare JID."
1770 ) from e 1761 ) from e
1771 1762
1778 if "http://jabber.org/protocol/pubsub#access-whitelist" not in features: 1769 if "http://jabber.org/protocol/pubsub#access-whitelist" not in features:
1779 raise exceptions.FeatureNotFound( 1770 raise exceptions.FeatureNotFound(
1780 "Server doesn't support the whitelist access model." 1771 "Server doesn't support the whitelist access model."
1781 ) 1772 )
1782 1773
1783 persistent_items_supported = \ 1774 persistent_items_supported = (
1784 "http://jabber.org/protocol/pubsub#persistent-items" in features 1775 "http://jabber.org/protocol/pubsub#persistent-items" in features
1776 )
1785 1777
1786 # TODO: persistent-items is a SHOULD, how do we handle the feature missing? 1778 # TODO: persistent-items is a SHOULD, how do we handle the feature missing?
1787 1779
1788 node = "urn:xmpp:openpgp:0:secret-key" 1780 node = "urn:xmpp:openpgp:0:secret-key"
1789 1781
1790 try: 1782 try:
1791 items, __ = await self.__xep_0060.get_items( 1783 items, __ = await self.__xep_0060.get_items(
1792 client, 1784 client, client.jid.userhostJID(), node, max_items=1
1793 client.jid.userhostJID(),
1794 node,
1795 max_items=1
1796 ) 1785 )
1797 except exceptions.NotFound: 1786 except exceptions.NotFound:
1798 try: 1787 try:
1799 await self.__xep_0060.createNode( 1788 await self.__xep_0060.createNode(
1800 client, 1789 client,
1801 client.jid.userhostJID(), 1790 client.jid.userhostJID(),
1802 node, 1791 node,
1803 { 1792 {
1804 XEP_0060.OPT_PERSIST_ITEMS: "true", 1793 XEP_0060.OPT_PERSIST_ITEMS: "true",
1805 XEP_0060.OPT_ACCESS_MODEL: "whitelist", 1794 XEP_0060.OPT_ACCESS_MODEL: "whitelist",
1806 XEP_0060.OPT_MAX_ITEMS: "1" 1795 XEP_0060.OPT_MAX_ITEMS: "1",
1807 } 1796 },
1808 ) 1797 )
1809 except Exception as e: 1798 except Exception as e:
1810 raise XMPPInteractionFailed( 1799 raise XMPPInteractionFailed(
1811 "Error creating the secret key synchronization node." 1800 "Error creating the secret key synchronization node."
1812 ) from e 1801 ) from e
1819 return cast(domish.Element, items[0]) 1808 return cast(domish.Element, items[0])
1820 except IndexError: 1809 except IndexError:
1821 return None 1810 return None
1822 1811
1823 async def export_secret_keys( 1812 async def export_secret_keys(
1824 self, 1813 self, client: SatXMPPClient, secret_keys: Iterable[GPGSecretKey]
1825 client: SatXMPPClient,
1826 secret_keys: Iterable[GPGSecretKey]
1827 ) -> str: 1814 ) -> str:
1828 """Export secret keys to synchronize them with other devices. 1815 """Export secret keys to synchronize them with other devices.
1829 1816
1830 @param client: The client. 1817 @param client: The client.
1831 @param secret_keys: The secret keys to export. 1818 @param secret_keys: The secret keys to export.
1852 secretkey_elt = domish.Element((NS_OX, "secretkey")) 1839 secretkey_elt = domish.Element((NS_OX, "secretkey"))
1853 secretkey_elt.addContent(base64.b64encode(ciphertext).decode("ASCII")) 1840 secretkey_elt.addContent(base64.b64encode(ciphertext).decode("ASCII"))
1854 1841
1855 try: 1842 try:
1856 await self.__xep_0060.send_item( 1843 await self.__xep_0060.send_item(
1857 client, 1844 client, client.jid.userhostJID(), node, secretkey_elt
1858 client.jid.userhostJID(),
1859 node,
1860 secretkey_elt
1861 ) 1845 )
1862 except Exception as e: 1846 except Exception as e:
1863 raise XMPPInteractionFailed("Publishing the secret keys failed.") from e 1847 raise XMPPInteractionFailed("Publishing the secret keys failed.") from e
1864 1848
1865 return backup_code 1849 return backup_code
1883 item_elt = await self.__prepare_secret_key_synchronization(client) 1867 item_elt = await self.__prepare_secret_key_synchronization(client)
1884 if item_elt is None: 1868 if item_elt is None:
1885 return None 1869 return None
1886 1870
1887 secretkey_elt = cast( 1871 secretkey_elt = cast(
1888 Optional[domish.Element], 1872 Optional[domish.Element], next(item_elt.elements(NS_OX, "secretkey"), None)
1889 next(item_elt.elements(NS_OX, "secretkey"), None)
1890 ) 1873 )
1891 1874
1892 if secretkey_elt is None: 1875 if secretkey_elt is None:
1893 return None 1876 return None
1894 1877
1900 ) from e 1883 ) from e
1901 1884
1902 return base64.b64decode(str(secretkey_elt)) 1885 return base64.b64decode(str(secretkey_elt))
1903 1886
1904 def import_secret_keys( 1887 def import_secret_keys(
1905 self, 1888 self, client: SatXMPPClient, ciphertext: bytes, backup_code: str
1906 client: SatXMPPClient,
1907 ciphertext: bytes,
1908 backup_code: str
1909 ) -> Set[GPGSecretKey]: 1889 ) -> Set[GPGSecretKey]:
1910 """import previously downloaded secret keys. 1890 """import previously downloaded secret keys.
1911 1891
1912 The downloading and importing steps are separate since a backup code is required 1892 The downloading and importing steps are separate since a backup code is required
1913 for the import and it should be possible to try multiple backup codes without 1893 for the import and it should be possible to try multiple backup codes without
1923 @raise DecryptionFailed: on decryption failure. 1903 @raise DecryptionFailed: on decryption failure.
1924 """ 1904 """
1925 1905
1926 gpg_provider = get_gpg_provider(self.host, client) 1906 gpg_provider = get_gpg_provider(self.host, client)
1927 1907
1928 return gpg_provider.restore_secret_keys(gpg_provider.decrypt_symmetrically( 1908 return gpg_provider.restore_secret_keys(
1929 ciphertext, 1909 gpg_provider.decrypt_symmetrically(ciphertext, backup_code)
1930 backup_code 1910 )
1931 ))
1932 1911
1933 @staticmethod 1912 @staticmethod
1934 def __get_joined_muc_users( 1913 def __get_joined_muc_users(
1935 client: SatXMPPClient, 1914 client: SatXMPPClient, xep_0045: XEP_0045, room_jid: jid.JID
1936 xep_0045: XEP_0045,
1937 room_jid: jid.JID
1938 ) -> Set[jid.JID]: 1915 ) -> Set[jid.JID]:
1939 """ 1916 """
1940 @param client: The client. 1917 @param client: The client.
1941 @param xep_0045: A MUC plugin instance. 1918 @param xep_0045: A MUC plugin instance.
1942 @param room_jid: The room JID. 1919 @param room_jid: The room JID.
1966 bare_jids.add(entity.jid.userhostJID()) 1943 bare_jids.add(entity.jid.userhostJID())
1967 1944
1968 return bare_jids 1945 return bare_jids
1969 1946
1970 async def get_trust( 1947 async def get_trust(
1971 self, 1948 self, client: SatXMPPClient, public_key: GPGPublicKey, owner: jid.JID
1972 client: SatXMPPClient,
1973 public_key: GPGPublicKey,
1974 owner: jid.JID
1975 ) -> TrustLevel: 1949 ) -> TrustLevel:
1976 """Query the trust level of a public key. 1950 """Query the trust level of a public key.
1977 1951
1978 @param client: The client to perform this operation under. 1952 @param client: The client to perform this operation under.
1979 @param public_key: The public key. 1953 @param public_key: The public key.
1991 async def set_trust( 1965 async def set_trust(
1992 self, 1966 self,
1993 client: SatXMPPClient, 1967 client: SatXMPPClient,
1994 public_key: GPGPublicKey, 1968 public_key: GPGPublicKey,
1995 owner: jid.JID, 1969 owner: jid.JID,
1996 trust_level: TrustLevel 1970 trust_level: TrustLevel,
1997 ) -> None: 1971 ) -> None:
1998 """Set the trust level of a public key. 1972 """Set the trust level of a public key.
1999 1973
2000 @param client: The client to perform this operation under. 1974 @param client: The client to perform this operation under.
2001 @param public_key: The public key. 1975 @param public_key: The public key.
2006 key = f"/trust/{owner.userhost()}/{public_key.fingerprint}" 1980 key = f"/trust/{owner.userhost()}/{public_key.fingerprint}"
2007 1981
2008 await self.__storage[client.profile].force(key, trust_level.name) 1982 await self.__storage[client.profile].force(key, trust_level.name)
2009 1983
2010 async def get_trust_ui( # pylint: disable=invalid-name 1984 async def get_trust_ui( # pylint: disable=invalid-name
2011 self, 1985 self, client: SatXMPPClient, entity: jid.JID
2012 client: SatXMPPClient,
2013 entity: jid.JID
2014 ) -> xml_tools.XMLUI: 1986 ) -> xml_tools.XMLUI:
2015 """ 1987 """
2016 @param client: The client. 1988 @param client: The client.
2017 @param entity: The entity whose device trust levels to manage. 1989 @param entity: The entity whose device trust levels to manage.
2018 @return: An XMLUI instance which opens a form to manage the trust level of all 1990 @return: An XMLUI instance which opens a form to manage the trust level of all
2024 1996
2025 bare_jids: Set[jid.JID] 1997 bare_jids: Set[jid.JID]
2026 if self.__xep_0045 is not None and self.__xep_0045.is_joined_room(client, entity): 1998 if self.__xep_0045 is not None and self.__xep_0045.is_joined_room(client, entity):
2027 bare_jids = self.__get_joined_muc_users(client, self.__xep_0045, entity) 1999 bare_jids = self.__get_joined_muc_users(client, self.__xep_0045, entity)
2028 else: 2000 else:
2029 bare_jids = { entity.userhostJID() } 2001 bare_jids = {entity.userhostJID()}
2030 2002
2031 all_public_keys = list({ 2003 all_public_keys = list(
2032 bare_jid: list(self.list_public_keys(client, bare_jid)) 2004 {
2033 for bare_jid 2005 bare_jid: list(self.list_public_keys(client, bare_jid))
2034 in bare_jids 2006 for bare_jid in bare_jids
2035 }.items()) 2007 }.items()
2008 )
2036 2009
2037 async def callback( 2010 async def callback(
2038 data: Any, 2011 data: Any, profile: str # pylint: disable=unused-argument
2039 profile: str # pylint: disable=unused-argument
2040 ) -> Dict[Never, Never]: 2012 ) -> Dict[Never, Never]:
2041 """ 2013 """
2042 @param data: The XMLUI result produces by the trust UI form. 2014 @param data: The XMLUI result produces by the trust UI form.
2043 @param profile: The profile. 2015 @param profile: The profile.
2044 @return: An empty dictionary. The type of the return value was chosen 2016 @return: An empty dictionary. The type of the return value was chosen
2047 2019
2048 if C.bool(data.get("cancelled", "false")): 2020 if C.bool(data.get("cancelled", "false")):
2049 return {} 2021 return {}
2050 2022
2051 data_form_result = cast( 2023 data_form_result = cast(
2052 Dict[str, str], 2024 Dict[str, str], xml_tools.xmlui_result_2_data_form_result(data)
2053 xml_tools.xmlui_result_2_data_form_result(data)
2054 ) 2025 )
2055 for key, value in data_form_result.items(): 2026 for key, value in data_form_result.items():
2056 if not key.startswith("trust_"): 2027 if not key.startswith("trust_"):
2057 continue 2028 continue
2058 2029
2068 return {} 2039 return {}
2069 2040
2070 submit_id = self.host.register_callback(callback, with_data=True, one_shot=True) 2041 submit_id = self.host.register_callback(callback, with_data=True, one_shot=True)
2071 2042
2072 result = xml_tools.XMLUI( 2043 result = xml_tools.XMLUI(
2073 panel_type=C.XMLUI_FORM, 2044 panel_type=C.XMLUI_FORM, title=D_("OX trust management"), submit_id=submit_id
2074 title=D_("OX trust management"),
2075 submit_id=submit_id
2076 ) 2045 )
2077 # Casting this to Any, otherwise all calls on the variable cause type errors 2046 # Casting this to Any, otherwise all calls on the variable cause type errors
2078 # pylint: disable=no-member 2047 # pylint: disable=no-member
2079 trust_ui = cast(Any, result) 2048 trust_ui = cast(Any, result)
2080 trust_ui.addText(D_( 2049 trust_ui.addText(
2081 "This is OX trusting system. You'll see below the GPG keys of your " 2050 D_(
2082 "contacts, and a list selection to trust them or not. A trusted key " 2051 "This is OX trusting system. You'll see below the GPG keys of your "
2083 "can read your messages in plain text, so be sure to only validate " 2052 "contacts, and a list selection to trust them or not. A trusted key "
2084 "keys that you are sure are belonging to your contact. It's better " 2053 "can read your messages in plain text, so be sure to only validate "
2085 "to do this when you are next to your contact, so " 2054 "keys that you are sure are belonging to your contact. It's better "
2086 "you can check the \"fingerprint\" of the key " 2055 "to do this when you are next to your contact, so "
2087 "yourself. Do *not* validate a key if the fingerprint is wrong!" 2056 'you can check the "fingerprint" of the key '
2088 )) 2057 "yourself. Do *not* validate a key if the fingerprint is wrong!"
2058 )
2059 )
2089 2060
2090 own_secret_keys = self.list_secret_keys(client) 2061 own_secret_keys = self.list_secret_keys(client)
2091 2062
2092 trust_ui.change_container("label") 2063 trust_ui.change_container("label")
2093 for index, secret_key in enumerate(own_secret_keys): 2064 for index, secret_key in enumerate(own_secret_keys):
2094 trust_ui.addLabel(D_(f"Own secret key {index} fingerprint")) 2065 trust_ui.addLabel(D_(f"Own secret key {index} fingerprint"))
2095 trust_ui.addText(secret_key.public_key.fingerprint) 2066 trust_ui.addText(secret_key.public_key.fingerprint)
2096 trust_ui.addEmpty() 2067 trust_ui.addEmpty()
2097 trust_ui.addEmpty() 2068 trust_ui.addEmpty()
2098 2069
2099 for outer_index, [ owner, public_keys ] in enumerate(all_public_keys): 2070 for outer_index, [owner, public_keys] in enumerate(all_public_keys):
2100 for inner_index, public_key in enumerate(public_keys): 2071 for inner_index, public_key in enumerate(public_keys):
2101 trust_ui.addLabel(D_("Contact")) 2072 trust_ui.addLabel(D_("Contact"))
2102 trust_ui.addJid(jid.JID(owner)) 2073 trust_ui.addJid(jid.JID(owner))
2103 trust_ui.addLabel(D_("Fingerprint")) 2074 trust_ui.addLabel(D_("Fingerprint"))
2104 trust_ui.addText(public_key.fingerprint) 2075 trust_ui.addText(public_key.fingerprint)
2105 trust_ui.addLabel(D_("Trust this device?")) 2076 trust_ui.addLabel(D_("Trust this device?"))
2106 2077
2107 current_trust_level = await self.get_trust(client, public_key, owner) 2078 current_trust_level = await self.get_trust(client, public_key, owner)
2108 avaiable_trust_levels = \ 2079 avaiable_trust_levels = {
2109 { TrustLevel.DISTRUSTED, TrustLevel.TRUSTED, current_trust_level } 2080 TrustLevel.DISTRUSTED,
2081 TrustLevel.TRUSTED,
2082 current_trust_level,
2083 }
2110 2084
2111 trust_ui.addList( 2085 trust_ui.addList(
2112 f"trust_{outer_index}_{inner_index}", 2086 f"trust_{outer_index}_{inner_index}",
2113 options=[ trust_level.name for trust_level in avaiable_trust_levels ], 2087 options=[trust_level.name for trust_level in avaiable_trust_levels],
2114 selected=current_trust_level.name, 2088 selected=current_trust_level.name,
2115 styles=[ "inline" ] 2089 styles=["inline"],
2116 ) 2090 )
2117 2091
2118 trust_ui.addEmpty() 2092 trust_ui.addEmpty()
2119 trust_ui.addEmpty() 2093 trust_ui.addEmpty()
2120 2094