Mercurial > libervia-backend
diff sat/plugins/plugin_xep_0420.py @ 3911:8289ac1b34f4
plugin XEP-0384: Fully reworked to adjust to the reworked python-omemo:
- support for both (modern) OMEMO under the `urn:xmpp:omemo:2` namespace and (legacy) OMEMO under the `eu.siacs.conversations.axolotl` namespace
- maintains one identity across both versions of OMEMO
- migrates data from the old plugin
- includes more features for protocol stability
- uses SCE for modern OMEMO
- fully type-checked, linted and format-checked
- added type hints to various pieces of backend code used by the plugin
- added stubs for some Twisted APIs used by the plugin under stubs/ (use `export MYPYPATH=stubs/` before running mypy)
- core (xmpp): enabled `send` trigger and made it an asyncPoint
fix 375
author | Syndace <me@syndace.dev> |
---|---|
date | Tue, 23 Aug 2022 21:06:24 +0200 |
parents | 00212260f659 |
children | 626629781a53 |
line wrap: on
line diff
--- a/sat/plugins/plugin_xep_0420.py Thu Sep 22 12:03:12 2022 +0200 +++ b/sat/plugins/plugin_xep_0420.py Tue Aug 23 21:06:24 2022 +0200 @@ -16,15 +16,12 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -# Type-check with `mypy --strict --disable-error-code no-untyped-call` -# Lint with `pylint` - from abc import ABC, abstractmethod from datetime import datetime import enum import secrets import string -from typing import Dict, Iterator, List, NamedTuple, Optional, Set, Tuple, Union, cast +from typing import Dict, NamedTuple, Optional, Set, Tuple, cast from lxml import etree @@ -55,7 +52,7 @@ ] -log = cast(Logger, getLogger(__name__)) +log = cast(Logger, getLogger(__name__)) # type: ignore[no-untyped-call] PLUGIN_INFO = { @@ -291,19 +288,15 @@ # Note the serialized byte size of the content element before adding any children empty_content_byte_size = len(content.toXml().encode("utf-8")) - # Just for type safety - stanza_children = cast(List[Union[domish.Element, str]], stanza.children) - content_children = cast(List[Union[domish.Element, str]], content.children) - # Move elements that are not explicitly forbidden from being encrypted from the # stanza to the content element. - for child in list(cast(Iterator[domish.Element], stanza.elements())): + for child in list(stanza.elements()): if ( child.uri not in XEP_0420.MUST_BE_PLAINTEXT_NAMESPACES and (child.uri, child.name) not in XEP_0420.MUST_BE_PLAINTEXT_ELEMENTS ): # Remove the child from the stanza - stanza_children.remove(child) + stanza.children.remove(child) # A namespace of ``None`` can be used on domish elements to inherit the # namespace from the parent. When moving elements from the stanza root to @@ -316,7 +309,7 @@ child.defaultUri = C.NS_CLIENT # Add the child with corrected namespaces to the content element - content_children.append(child) + content.addChild(child) # Add the affixes requested by the profile if profile.rpad_policy is not SCEAffixPolicy.NOT_NEEDED: @@ -345,7 +338,7 @@ time_element["stamp"] = XEP_0082.format_datetime() if profile.to_policy is not SCEAffixPolicy.NOT_NEEDED: - recipient = cast(Optional[str], stanza.getAttribute("to", None)) + recipient = stanza.getAttribute("to", None) if recipient is None: raise ValueError( "<to/> affix requested, but stanza doesn't have the 'to' attribute" @@ -356,7 +349,7 @@ to_element["jid"] = jid.JID(recipient).userhost() if profile.from_policy is not SCEAffixPolicy.NOT_NEEDED: - sender = cast(Optional[str], stanza.getAttribute("from", None)) + sender = stanza.getAttribute("from", None) if sender is None: raise ValueError( "<from/> affix requested, but stanza doesn't have the 'from'" @@ -370,7 +363,7 @@ if policy is not SCEAffixPolicy.NOT_NEEDED: envelope.addChild(affix.create(stanza)) - return cast(str, envelope.toXml()).encode("utf-8") + return envelope.toXml().encode("utf-8") @staticmethod def unpack_stanza( @@ -431,7 +424,7 @@ # Prepare the envelope and content elements envelope = cast(domish.Element, ElementParser()(envelope_serialized_string)) - content = cast(domish.Element, next(envelope.elements(NS_SCE, "content"))) + content = next(envelope.elements(NS_SCE, "content")) # Verify the affixes rpad_element = cast( @@ -468,7 +461,7 @@ if to_element is not None: recipient_value = jid.JID(to_element["jid"]) - recipient_actual = cast(Optional[str], stanza.getAttribute("to", None)) + recipient_actual = stanza.getAttribute("to", None) if recipient_actual is None: raise AffixVerificationFailed( "'To' affix is included in the envelope, but the stanza is lacking a" @@ -491,7 +484,7 @@ if from_element is not None: sender_value = jid.JID(from_element["jid"]) - sender_actual = cast(Optional[str], stanza.getAttribute("from", None)) + sender_actual = stanza.getAttribute("from", None) if sender_actual is None: raise AffixVerificationFailed( "'From' affix is included in the envelope, but the stanza is lacking" @@ -551,13 +544,9 @@ + custom_missing_string ) - # Just for type safety - content_children = cast(List[Union[domish.Element, str]], content.children) - stanza_children = cast(List[Union[domish.Element, str]], stanza.children) - # Move elements that are not explicitly forbidden from being encrypted from the # content element to the stanza. - for child in list(cast(Iterator[domish.Element], content.elements())): + for child in list(content.elements()): if ( child.uri in XEP_0420.MUST_BE_PLAINTEXT_NAMESPACES or (child.uri, child.name) in XEP_0420.MUST_BE_PLAINTEXT_ELEMENTS @@ -568,10 +557,10 @@ ) else: # Remove the child from the content element - content_children.remove(child) + content.children.remove(child) # Add the child to the stanza - stanza_children.append(child) + stanza.addChild(child) return SCEAffixValues( rpad_value,