comparison libervia/backend/plugins/plugin_xep_0420.py @ 4270:0d7bb4df2343

Reformatted code base using black.
author Goffi <goffi@goffi.org>
date Wed, 19 Jun 2024 18:44:57 +0200
parents 7c5654c54fed
children
comparison
equal deleted inserted replaced
4269:64a85ce8be70 4270:0d7bb4df2343
48 "ProfileRequirementsNotMet", 48 "ProfileRequirementsNotMet",
49 "AffixVerificationFailed", 49 "AffixVerificationFailed",
50 "SCECustomAffix", 50 "SCECustomAffix",
51 "SCEAffixPolicy", 51 "SCEAffixPolicy",
52 "SCEProfile", 52 "SCEProfile",
53 "SCEAffixValues" 53 "SCEAffixValues",
54 ] 54 ]
55 55
56 56
57 log = cast(Logger, getLogger(__name__)) # type: ignore[no-untyped-call] 57 log = cast(Logger, getLogger(__name__)) # type: ignore[no-untyped-call]
58 58
59 59
60 PLUGIN_INFO = { 60 PLUGIN_INFO = {
61 C.PI_NAME: "SCE", 61 C.PI_NAME: "SCE",
62 C.PI_IMPORT_NAME: "XEP-0420", 62 C.PI_IMPORT_NAME: "XEP-0420",
63 C.PI_TYPE: "SEC", 63 C.PI_TYPE: "SEC",
64 C.PI_PROTOCOLS: [ "XEP-0420" ], 64 C.PI_PROTOCOLS: ["XEP-0420"],
65 C.PI_DEPENDENCIES: [ "XEP-0334", "XEP-0082" ], 65 C.PI_DEPENDENCIES: ["XEP-0334", "XEP-0082"],
66 C.PI_RECOMMENDATIONS: [ "XEP-0045", "XEP-0033", "XEP-0359" ], 66 C.PI_RECOMMENDATIONS: ["XEP-0045", "XEP-0033", "XEP-0359"],
67 C.PI_MAIN: "XEP_0420", 67 C.PI_MAIN: "XEP_0420",
68 C.PI_HANDLER: "no", 68 C.PI_HANDLER: "no",
69 C.PI_DESCRIPTION: D_("Implementation of Stanza Content Encryption"), 69 C.PI_DESCRIPTION: D_("Implementation of Stanza Content Encryption"),
70 } 70 }
71 71
238 NS_HINTS, 238 NS_HINTS,
239 NS_SID, # TODO: Not sure whether this ban applies to both stanza-id and origin-id 239 NS_SID, # TODO: Not sure whether this ban applies to both stanza-id and origin-id
240 NS_ADDRESS, 240 NS_ADDRESS,
241 # Not part of the specification (yet), but just doesn't make sense in an encrypted 241 # Not part of the specification (yet), but just doesn't make sense in an encrypted
242 # envelope: 242 # envelope:
243 NS_EME 243 NS_EME,
244 } 244 }
245 245
246 # Set of (namespace, element name) tuples that define elements which are never allowed 246 # Set of (namespace, element name) tuples that define elements which are never allowed
247 # to be transferred in an encrypted envelope. If all elements under a certain 247 # to be transferred in an encrypted envelope. If all elements under a certain
248 # namespace are forbidden, the namespace can be added to 248 # namespace are forbidden, the namespace can be added to
329 content_byte_size = len(content.toXml().encode("utf-8")) 329 content_byte_size = len(content.toXml().encode("utf-8"))
330 content_byte_size_diff = content_byte_size - empty_content_byte_size 330 content_byte_size_diff = content_byte_size - empty_content_byte_size
331 rpad_length = max(0, 53 - content_byte_size_diff) + secrets.randbelow(201) 331 rpad_length = max(0, 53 - content_byte_size_diff) + secrets.randbelow(201)
332 rpad_content = "".join( 332 rpad_content = "".join(
333 secrets.choice(string.digits + string.ascii_letters + string.punctuation) 333 secrets.choice(string.digits + string.ascii_letters + string.punctuation)
334 for __ 334 for __ in range(rpad_length)
335 in range(rpad_length)
336 ) 335 )
337 envelope.addElement((NS_SCE, "rpad"), content=rpad_content) 336 envelope.addElement((NS_SCE, "rpad"), content=rpad_content)
338 337
339 if profile.time_policy is not SCEAffixPolicy.NOT_NEEDED: 338 if profile.time_policy is not SCEAffixPolicy.NOT_NEEDED:
340 time_element = envelope.addElement((NS_SCE, "time")) 339 time_element = envelope.addElement((NS_SCE, "time"))
368 367
369 return envelope.toXml().encode("utf-8") 368 return envelope.toXml().encode("utf-8")
370 369
371 @staticmethod 370 @staticmethod
372 def unpack_stanza( 371 def unpack_stanza(
373 profile: SCEProfile, 372 profile: SCEProfile, stanza: domish.Element, envelope_serialized: bytes
374 stanza: domish.Element,
375 envelope_serialized: bytes
376 ) -> SCEAffixValues: 373 ) -> SCEAffixValues:
377 """Unpack a stanza packed according to Stanza Content Encryption. 374 """Unpack a stanza packed according to Stanza Content Encryption.
378 375
379 Parses the serialized envelope as XML, verifies included affixes and makes sure 376 Parses the serialized envelope as XML, verifies included affixes and makes sure
380 the requirements of the profile are met, and restores the stanza by moving 377 the requirements of the profile are met, and restores the stanza by moving
407 ) from e 404 ) from e
408 405
409 custom_affixes = set(profile.custom_policies.keys()) 406 custom_affixes = set(profile.custom_policies.keys())
410 407
411 # Make sure the envelope adheres to the schema 408 # Make sure the envelope adheres to the schema
412 parser = etree.XMLParser(schema=etree.XMLSchema(etree.XML(ENVELOPE_SCHEMA.format( 409 parser = etree.XMLParser(
413 custom_affix_references="".join( 410 schema=etree.XMLSchema(
414 f'<xs:element ref="{custom_affix.element_name}" minOccurs="0"/>' 411 etree.XML(
415 for custom_affix 412 ENVELOPE_SCHEMA.format(
416 in custom_affixes 413 custom_affix_references="".join(
417 ), 414 f'<xs:element ref="{custom_affix.element_name}" minOccurs="0"/>'
418 custom_affix_definitions="".join( 415 for custom_affix in custom_affixes
419 custom_affix.element_schema 416 ),
420 for custom_affix 417 custom_affix_definitions="".join(
421 in custom_affixes 418 custom_affix.element_schema for custom_affix in custom_affixes
419 ),
420 ).encode("utf-8")
421 )
422 ) 422 )
423 ).encode("utf-8")))) 423 )
424 424
425 try: 425 try:
426 etree.fromstring(envelope_serialized_string, parser) 426 etree.fromstring(envelope_serialized_string, parser)
427 except etree.XMLSyntaxError as e: 427 except etree.XMLSyntaxError as e:
428 raise exceptions.ParsingError( 428 raise exceptions.ParsingError(
433 envelope = cast(domish.Element, ElementParser()(envelope_serialized_string)) 433 envelope = cast(domish.Element, ElementParser()(envelope_serialized_string))
434 content = next(envelope.elements(NS_SCE, "content")) 434 content = next(envelope.elements(NS_SCE, "content"))
435 435
436 # Verify the affixes 436 # Verify the affixes
437 rpad_element = cast( 437 rpad_element = cast(
438 Optional[domish.Element], 438 Optional[domish.Element], next(envelope.elements(NS_SCE, "rpad"), None)
439 next(envelope.elements(NS_SCE, "rpad"), None)
440 ) 439 )
441 time_element = cast( 440 time_element = cast(
442 Optional[domish.Element], 441 Optional[domish.Element], next(envelope.elements(NS_SCE, "time"), None)
443 next(envelope.elements(NS_SCE, "time"), None)
444 ) 442 )
445 to_element = cast( 443 to_element = cast(
446 Optional[domish.Element], 444 Optional[domish.Element], next(envelope.elements(NS_SCE, "to"), None)
447 next(envelope.elements(NS_SCE, "to"), None)
448 ) 445 )
449 from_element = cast( 446 from_element = cast(
450 Optional[domish.Element], 447 Optional[domish.Element], next(envelope.elements(NS_SCE, "from"), None)
451 next(envelope.elements(NS_SCE, "from"), None)
452 ) 448 )
453 449
454 # The rpad doesn't need verification. 450 # The rpad doesn't need verification.
455 rpad_value = None if rpad_element is None else str(rpad_element) 451 rpad_value = None if rpad_element is None else str(rpad_element)
456 452
457 # The time affix isn't verified other than that the timestamp is parseable. 453 # The time affix isn't verified other than that the timestamp is parseable.
458 try: 454 try:
459 timestamp_value = None if time_element is None else \ 455 timestamp_value = (
460 XEP_0082.parse_datetime(time_element["stamp"]) 456 None
457 if time_element is None
458 else XEP_0082.parse_datetime(time_element["stamp"])
459 )
461 except ValueError as e: 460 except ValueError as e:
462 raise AffixVerificationFailed("Malformed time affix.") from e 461 raise AffixVerificationFailed("Malformed time affix.") from e
463 462
464 # The to affix is verified by comparing the to attribute of the stanza with the 463 # The to affix is verified by comparing the to attribute of the stanza with the
465 # JID referenced by the affix. Note that only bare JIDs are compared as per the 464 # JID referenced by the affix. Note that only bare JIDs are compared as per the
511 custom_values: Dict[SCECustomAffix, domish.Element] = {} 510 custom_values: Dict[SCECustomAffix, domish.Element] = {}
512 for affix in custom_affixes: 511 for affix in custom_affixes:
513 element_name = affix.element_name 512 element_name = affix.element_name
514 element = cast( 513 element = cast(
515 Optional[domish.Element], 514 Optional[domish.Element],
516 next(envelope.elements(NS_SCE, element_name), None) 515 next(envelope.elements(NS_SCE, element_name), None),
517 ) 516 )
518 if element is not None: 517 if element is not None:
519 affix.verify(stanza, element) 518 affix.verify(stanza, element)
520 custom_values[affix] = element 519 custom_values[affix] = element
521 520
522 # Check whether all affixes required by the profile are present 521 # Check whether all affixes required by the profile are present
523 rpad_missing = \ 522 rpad_missing = (
524 profile.rpad_policy is SCEAffixPolicy.REQUIRED and rpad_element is None 523 profile.rpad_policy is SCEAffixPolicy.REQUIRED and rpad_element is None
525 time_missing = \ 524 )
525 time_missing = (
526 profile.time_policy is SCEAffixPolicy.REQUIRED and time_element is None 526 profile.time_policy is SCEAffixPolicy.REQUIRED and time_element is None
527 to_missing = \ 527 )
528 profile.to_policy is SCEAffixPolicy.REQUIRED and to_element is None 528 to_missing = profile.to_policy is SCEAffixPolicy.REQUIRED and to_element is None
529 from_missing = \ 529 from_missing = (
530 profile.from_policy is SCEAffixPolicy.REQUIRED and from_element is None 530 profile.from_policy is SCEAffixPolicy.REQUIRED and from_element is None
531 )
531 custom_missing = any( 532 custom_missing = any(
532 affix not in custom_values 533 affix not in custom_values
533 for affix, policy 534 for affix, policy in profile.custom_policies.items()
534 in profile.custom_policies.items()
535 if policy is SCEAffixPolicy.REQUIRED 535 if policy is SCEAffixPolicy.REQUIRED
536 ) 536 )
537 537
538 if rpad_missing or time_missing or to_missing or from_missing or custom_missing: 538 if rpad_missing or time_missing or to_missing or from_missing or custom_missing:
539 custom_missing_string = "" 539 custom_missing_string = ""
568 568
569 # Add the child to the stanza 569 # Add the child to the stanza
570 stanza.addChild(child) 570 stanza.addChild(child)
571 571
572 return SCEAffixValues( 572 return SCEAffixValues(
573 rpad_value, 573 rpad_value, timestamp_value, recipient_value, sender_value, custom_values
574 timestamp_value, 574 )
575 recipient_value,
576 sender_value,
577 custom_values
578 )