Mercurial > libervia-backend
comparison libervia/backend/plugins/plugin_xep_0420.py @ 4270:0d7bb4df2343
Reformatted code base using black.
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 19 Jun 2024 18:44:57 +0200 |
parents | 7c5654c54fed |
children |
comparison
equal
deleted
inserted
replaced
4269:64a85ce8be70 | 4270:0d7bb4df2343 |
---|---|
48 "ProfileRequirementsNotMet", | 48 "ProfileRequirementsNotMet", |
49 "AffixVerificationFailed", | 49 "AffixVerificationFailed", |
50 "SCECustomAffix", | 50 "SCECustomAffix", |
51 "SCEAffixPolicy", | 51 "SCEAffixPolicy", |
52 "SCEProfile", | 52 "SCEProfile", |
53 "SCEAffixValues" | 53 "SCEAffixValues", |
54 ] | 54 ] |
55 | 55 |
56 | 56 |
57 log = cast(Logger, getLogger(__name__)) # type: ignore[no-untyped-call] | 57 log = cast(Logger, getLogger(__name__)) # type: ignore[no-untyped-call] |
58 | 58 |
59 | 59 |
60 PLUGIN_INFO = { | 60 PLUGIN_INFO = { |
61 C.PI_NAME: "SCE", | 61 C.PI_NAME: "SCE", |
62 C.PI_IMPORT_NAME: "XEP-0420", | 62 C.PI_IMPORT_NAME: "XEP-0420", |
63 C.PI_TYPE: "SEC", | 63 C.PI_TYPE: "SEC", |
64 C.PI_PROTOCOLS: [ "XEP-0420" ], | 64 C.PI_PROTOCOLS: ["XEP-0420"], |
65 C.PI_DEPENDENCIES: [ "XEP-0334", "XEP-0082" ], | 65 C.PI_DEPENDENCIES: ["XEP-0334", "XEP-0082"], |
66 C.PI_RECOMMENDATIONS: [ "XEP-0045", "XEP-0033", "XEP-0359" ], | 66 C.PI_RECOMMENDATIONS: ["XEP-0045", "XEP-0033", "XEP-0359"], |
67 C.PI_MAIN: "XEP_0420", | 67 C.PI_MAIN: "XEP_0420", |
68 C.PI_HANDLER: "no", | 68 C.PI_HANDLER: "no", |
69 C.PI_DESCRIPTION: D_("Implementation of Stanza Content Encryption"), | 69 C.PI_DESCRIPTION: D_("Implementation of Stanza Content Encryption"), |
70 } | 70 } |
71 | 71 |
238 NS_HINTS, | 238 NS_HINTS, |
239 NS_SID, # TODO: Not sure whether this ban applies to both stanza-id and origin-id | 239 NS_SID, # TODO: Not sure whether this ban applies to both stanza-id and origin-id |
240 NS_ADDRESS, | 240 NS_ADDRESS, |
241 # Not part of the specification (yet), but just doesn't make sense in an encrypted | 241 # Not part of the specification (yet), but just doesn't make sense in an encrypted |
242 # envelope: | 242 # envelope: |
243 NS_EME | 243 NS_EME, |
244 } | 244 } |
245 | 245 |
246 # Set of (namespace, element name) tuples that define elements which are never allowed | 246 # Set of (namespace, element name) tuples that define elements which are never allowed |
247 # to be transferred in an encrypted envelope. If all elements under a certain | 247 # to be transferred in an encrypted envelope. If all elements under a certain |
248 # namespace are forbidden, the namespace can be added to | 248 # namespace are forbidden, the namespace can be added to |
329 content_byte_size = len(content.toXml().encode("utf-8")) | 329 content_byte_size = len(content.toXml().encode("utf-8")) |
330 content_byte_size_diff = content_byte_size - empty_content_byte_size | 330 content_byte_size_diff = content_byte_size - empty_content_byte_size |
331 rpad_length = max(0, 53 - content_byte_size_diff) + secrets.randbelow(201) | 331 rpad_length = max(0, 53 - content_byte_size_diff) + secrets.randbelow(201) |
332 rpad_content = "".join( | 332 rpad_content = "".join( |
333 secrets.choice(string.digits + string.ascii_letters + string.punctuation) | 333 secrets.choice(string.digits + string.ascii_letters + string.punctuation) |
334 for __ | 334 for __ in range(rpad_length) |
335 in range(rpad_length) | |
336 ) | 335 ) |
337 envelope.addElement((NS_SCE, "rpad"), content=rpad_content) | 336 envelope.addElement((NS_SCE, "rpad"), content=rpad_content) |
338 | 337 |
339 if profile.time_policy is not SCEAffixPolicy.NOT_NEEDED: | 338 if profile.time_policy is not SCEAffixPolicy.NOT_NEEDED: |
340 time_element = envelope.addElement((NS_SCE, "time")) | 339 time_element = envelope.addElement((NS_SCE, "time")) |
368 | 367 |
369 return envelope.toXml().encode("utf-8") | 368 return envelope.toXml().encode("utf-8") |
370 | 369 |
371 @staticmethod | 370 @staticmethod |
372 def unpack_stanza( | 371 def unpack_stanza( |
373 profile: SCEProfile, | 372 profile: SCEProfile, stanza: domish.Element, envelope_serialized: bytes |
374 stanza: domish.Element, | |
375 envelope_serialized: bytes | |
376 ) -> SCEAffixValues: | 373 ) -> SCEAffixValues: |
377 """Unpack a stanza packed according to Stanza Content Encryption. | 374 """Unpack a stanza packed according to Stanza Content Encryption. |
378 | 375 |
379 Parses the serialized envelope as XML, verifies included affixes and makes sure | 376 Parses the serialized envelope as XML, verifies included affixes and makes sure |
380 the requirements of the profile are met, and restores the stanza by moving | 377 the requirements of the profile are met, and restores the stanza by moving |
407 ) from e | 404 ) from e |
408 | 405 |
409 custom_affixes = set(profile.custom_policies.keys()) | 406 custom_affixes = set(profile.custom_policies.keys()) |
410 | 407 |
411 # Make sure the envelope adheres to the schema | 408 # Make sure the envelope adheres to the schema |
412 parser = etree.XMLParser(schema=etree.XMLSchema(etree.XML(ENVELOPE_SCHEMA.format( | 409 parser = etree.XMLParser( |
413 custom_affix_references="".join( | 410 schema=etree.XMLSchema( |
414 f'<xs:element ref="{custom_affix.element_name}" minOccurs="0"/>' | 411 etree.XML( |
415 for custom_affix | 412 ENVELOPE_SCHEMA.format( |
416 in custom_affixes | 413 custom_affix_references="".join( |
417 ), | 414 f'<xs:element ref="{custom_affix.element_name}" minOccurs="0"/>' |
418 custom_affix_definitions="".join( | 415 for custom_affix in custom_affixes |
419 custom_affix.element_schema | 416 ), |
420 for custom_affix | 417 custom_affix_definitions="".join( |
421 in custom_affixes | 418 custom_affix.element_schema for custom_affix in custom_affixes |
419 ), | |
420 ).encode("utf-8") | |
421 ) | |
422 ) | 422 ) |
423 ).encode("utf-8")))) | 423 ) |
424 | 424 |
425 try: | 425 try: |
426 etree.fromstring(envelope_serialized_string, parser) | 426 etree.fromstring(envelope_serialized_string, parser) |
427 except etree.XMLSyntaxError as e: | 427 except etree.XMLSyntaxError as e: |
428 raise exceptions.ParsingError( | 428 raise exceptions.ParsingError( |
433 envelope = cast(domish.Element, ElementParser()(envelope_serialized_string)) | 433 envelope = cast(domish.Element, ElementParser()(envelope_serialized_string)) |
434 content = next(envelope.elements(NS_SCE, "content")) | 434 content = next(envelope.elements(NS_SCE, "content")) |
435 | 435 |
436 # Verify the affixes | 436 # Verify the affixes |
437 rpad_element = cast( | 437 rpad_element = cast( |
438 Optional[domish.Element], | 438 Optional[domish.Element], next(envelope.elements(NS_SCE, "rpad"), None) |
439 next(envelope.elements(NS_SCE, "rpad"), None) | |
440 ) | 439 ) |
441 time_element = cast( | 440 time_element = cast( |
442 Optional[domish.Element], | 441 Optional[domish.Element], next(envelope.elements(NS_SCE, "time"), None) |
443 next(envelope.elements(NS_SCE, "time"), None) | |
444 ) | 442 ) |
445 to_element = cast( | 443 to_element = cast( |
446 Optional[domish.Element], | 444 Optional[domish.Element], next(envelope.elements(NS_SCE, "to"), None) |
447 next(envelope.elements(NS_SCE, "to"), None) | |
448 ) | 445 ) |
449 from_element = cast( | 446 from_element = cast( |
450 Optional[domish.Element], | 447 Optional[domish.Element], next(envelope.elements(NS_SCE, "from"), None) |
451 next(envelope.elements(NS_SCE, "from"), None) | |
452 ) | 448 ) |
453 | 449 |
454 # The rpad doesn't need verification. | 450 # The rpad doesn't need verification. |
455 rpad_value = None if rpad_element is None else str(rpad_element) | 451 rpad_value = None if rpad_element is None else str(rpad_element) |
456 | 452 |
457 # The time affix isn't verified other than that the timestamp is parseable. | 453 # The time affix isn't verified other than that the timestamp is parseable. |
458 try: | 454 try: |
459 timestamp_value = None if time_element is None else \ | 455 timestamp_value = ( |
460 XEP_0082.parse_datetime(time_element["stamp"]) | 456 None |
457 if time_element is None | |
458 else XEP_0082.parse_datetime(time_element["stamp"]) | |
459 ) | |
461 except ValueError as e: | 460 except ValueError as e: |
462 raise AffixVerificationFailed("Malformed time affix.") from e | 461 raise AffixVerificationFailed("Malformed time affix.") from e |
463 | 462 |
464 # The to affix is verified by comparing the to attribute of the stanza with the | 463 # The to affix is verified by comparing the to attribute of the stanza with the |
465 # JID referenced by the affix. Note that only bare JIDs are compared as per the | 464 # JID referenced by the affix. Note that only bare JIDs are compared as per the |
511 custom_values: Dict[SCECustomAffix, domish.Element] = {} | 510 custom_values: Dict[SCECustomAffix, domish.Element] = {} |
512 for affix in custom_affixes: | 511 for affix in custom_affixes: |
513 element_name = affix.element_name | 512 element_name = affix.element_name |
514 element = cast( | 513 element = cast( |
515 Optional[domish.Element], | 514 Optional[domish.Element], |
516 next(envelope.elements(NS_SCE, element_name), None) | 515 next(envelope.elements(NS_SCE, element_name), None), |
517 ) | 516 ) |
518 if element is not None: | 517 if element is not None: |
519 affix.verify(stanza, element) | 518 affix.verify(stanza, element) |
520 custom_values[affix] = element | 519 custom_values[affix] = element |
521 | 520 |
522 # Check whether all affixes required by the profile are present | 521 # Check whether all affixes required by the profile are present |
523 rpad_missing = \ | 522 rpad_missing = ( |
524 profile.rpad_policy is SCEAffixPolicy.REQUIRED and rpad_element is None | 523 profile.rpad_policy is SCEAffixPolicy.REQUIRED and rpad_element is None |
525 time_missing = \ | 524 ) |
525 time_missing = ( | |
526 profile.time_policy is SCEAffixPolicy.REQUIRED and time_element is None | 526 profile.time_policy is SCEAffixPolicy.REQUIRED and time_element is None |
527 to_missing = \ | 527 ) |
528 profile.to_policy is SCEAffixPolicy.REQUIRED and to_element is None | 528 to_missing = profile.to_policy is SCEAffixPolicy.REQUIRED and to_element is None |
529 from_missing = \ | 529 from_missing = ( |
530 profile.from_policy is SCEAffixPolicy.REQUIRED and from_element is None | 530 profile.from_policy is SCEAffixPolicy.REQUIRED and from_element is None |
531 ) | |
531 custom_missing = any( | 532 custom_missing = any( |
532 affix not in custom_values | 533 affix not in custom_values |
533 for affix, policy | 534 for affix, policy in profile.custom_policies.items() |
534 in profile.custom_policies.items() | |
535 if policy is SCEAffixPolicy.REQUIRED | 535 if policy is SCEAffixPolicy.REQUIRED |
536 ) | 536 ) |
537 | 537 |
538 if rpad_missing or time_missing or to_missing or from_missing or custom_missing: | 538 if rpad_missing or time_missing or to_missing or from_missing or custom_missing: |
539 custom_missing_string = "" | 539 custom_missing_string = "" |
568 | 568 |
569 # Add the child to the stanza | 569 # Add the child to the stanza |
570 stanza.addChild(child) | 570 stanza.addChild(child) |
571 | 571 |
572 return SCEAffixValues( | 572 return SCEAffixValues( |
573 rpad_value, | 573 rpad_value, timestamp_value, recipient_value, sender_value, custom_values |
574 timestamp_value, | 574 ) |
575 recipient_value, | |
576 sender_value, | |
577 custom_values | |
578 ) |