Mercurial > libervia-backend
diff libervia/backend/plugins/plugin_xep_0420.py @ 4270:0d7bb4df2343
Reformatted code base using black.
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 19 Jun 2024 18:44:57 +0200 |
parents | 7c5654c54fed |
children |
line wrap: on
line diff
--- a/libervia/backend/plugins/plugin_xep_0420.py Tue Jun 18 12:06:45 2024 +0200 +++ b/libervia/backend/plugins/plugin_xep_0420.py Wed Jun 19 18:44:57 2024 +0200 @@ -50,7 +50,7 @@ "SCECustomAffix", "SCEAffixPolicy", "SCEProfile", - "SCEAffixValues" + "SCEAffixValues", ] @@ -61,9 +61,9 @@ C.PI_NAME: "SCE", C.PI_IMPORT_NAME: "XEP-0420", C.PI_TYPE: "SEC", - C.PI_PROTOCOLS: [ "XEP-0420" ], - C.PI_DEPENDENCIES: [ "XEP-0334", "XEP-0082" ], - C.PI_RECOMMENDATIONS: [ "XEP-0045", "XEP-0033", "XEP-0359" ], + C.PI_PROTOCOLS: ["XEP-0420"], + C.PI_DEPENDENCIES: ["XEP-0334", "XEP-0082"], + C.PI_RECOMMENDATIONS: ["XEP-0045", "XEP-0033", "XEP-0359"], C.PI_MAIN: "XEP_0420", C.PI_HANDLER: "no", C.PI_DESCRIPTION: D_("Implementation of Stanza Content Encryption"), @@ -240,7 +240,7 @@ NS_ADDRESS, # Not part of the specification (yet), but just doesn't make sense in an encrypted # envelope: - NS_EME + NS_EME, } # Set of (namespace, element name) tuples that define elements which are never allowed @@ -331,8 +331,7 @@ rpad_length = max(0, 53 - content_byte_size_diff) + secrets.randbelow(201) rpad_content = "".join( secrets.choice(string.digits + string.ascii_letters + string.punctuation) - for __ - in range(rpad_length) + for __ in range(rpad_length) ) envelope.addElement((NS_SCE, "rpad"), content=rpad_content) @@ -370,9 +369,7 @@ @staticmethod def unpack_stanza( - profile: SCEProfile, - stanza: domish.Element, - envelope_serialized: bytes + profile: SCEProfile, stanza: domish.Element, envelope_serialized: bytes ) -> SCEAffixValues: """Unpack a stanza packed according to Stanza Content Encryption. @@ -409,18 +406,21 @@ custom_affixes = set(profile.custom_policies.keys()) # Make sure the envelope adheres to the schema - parser = etree.XMLParser(schema=etree.XMLSchema(etree.XML(ENVELOPE_SCHEMA.format( - custom_affix_references="".join( - f'<xs:element ref="{custom_affix.element_name}" minOccurs="0"/>' - for custom_affix - in custom_affixes - ), - custom_affix_definitions="".join( - custom_affix.element_schema - for custom_affix - in custom_affixes + parser = etree.XMLParser( + schema=etree.XMLSchema( + etree.XML( + ENVELOPE_SCHEMA.format( + custom_affix_references="".join( + f'<xs:element ref="{custom_affix.element_name}" minOccurs="0"/>' + for custom_affix in custom_affixes + ), + custom_affix_definitions="".join( + custom_affix.element_schema for custom_affix in custom_affixes + ), + ).encode("utf-8") + ) ) - ).encode("utf-8")))) + ) try: etree.fromstring(envelope_serialized_string, parser) @@ -435,20 +435,16 @@ # Verify the affixes rpad_element = cast( - Optional[domish.Element], - next(envelope.elements(NS_SCE, "rpad"), None) + Optional[domish.Element], next(envelope.elements(NS_SCE, "rpad"), None) ) time_element = cast( - Optional[domish.Element], - next(envelope.elements(NS_SCE, "time"), None) + Optional[domish.Element], next(envelope.elements(NS_SCE, "time"), None) ) to_element = cast( - Optional[domish.Element], - next(envelope.elements(NS_SCE, "to"), None) + Optional[domish.Element], next(envelope.elements(NS_SCE, "to"), None) ) from_element = cast( - Optional[domish.Element], - next(envelope.elements(NS_SCE, "from"), None) + Optional[domish.Element], next(envelope.elements(NS_SCE, "from"), None) ) # The rpad doesn't need verification. @@ -456,8 +452,11 @@ # The time affix isn't verified other than that the timestamp is parseable. try: - timestamp_value = None if time_element is None else \ - XEP_0082.parse_datetime(time_element["stamp"]) + timestamp_value = ( + None + if time_element is None + else XEP_0082.parse_datetime(time_element["stamp"]) + ) except ValueError as e: raise AffixVerificationFailed("Malformed time affix.") from e @@ -513,25 +512,26 @@ element_name = affix.element_name element = cast( Optional[domish.Element], - next(envelope.elements(NS_SCE, element_name), None) + next(envelope.elements(NS_SCE, element_name), None), ) if element is not None: affix.verify(stanza, element) custom_values[affix] = element # Check whether all affixes required by the profile are present - rpad_missing = \ + rpad_missing = ( profile.rpad_policy is SCEAffixPolicy.REQUIRED and rpad_element is None - time_missing = \ + ) + time_missing = ( profile.time_policy is SCEAffixPolicy.REQUIRED and time_element is None - to_missing = \ - profile.to_policy is SCEAffixPolicy.REQUIRED and to_element is None - from_missing = \ + ) + to_missing = profile.to_policy is SCEAffixPolicy.REQUIRED and to_element is None + from_missing = ( profile.from_policy is SCEAffixPolicy.REQUIRED and from_element is None + ) custom_missing = any( affix not in custom_values - for affix, policy - in profile.custom_policies.items() + for affix, policy in profile.custom_policies.items() if policy is SCEAffixPolicy.REQUIRED ) @@ -570,9 +570,5 @@ stanza.addChild(child) return SCEAffixValues( - rpad_value, - timestamp_value, - recipient_value, - sender_value, - custom_values + rpad_value, timestamp_value, recipient_value, sender_value, custom_values )