diff sat/plugins/plugin_xep_0420.py @ 3911:8289ac1b34f4

plugin XEP-0384: Fully reworked to adjust to the reworked python-omemo: - support for both (modern) OMEMO under the `urn:xmpp:omemo:2` namespace and (legacy) OMEMO under the `eu.siacs.conversations.axolotl` namespace - maintains one identity across both versions of OMEMO - migrates data from the old plugin - includes more features for protocol stability - uses SCE for modern OMEMO - fully type-checked, linted and format-checked - added type hints to various pieces of backend code used by the plugin - added stubs for some Twisted APIs used by the plugin under stubs/ (use `export MYPYPATH=stubs/` before running mypy) - core (xmpp): enabled `send` trigger and made it an asyncPoint fix 375
author Syndace <me@syndace.dev>
date Tue, 23 Aug 2022 21:06:24 +0200
parents 00212260f659
children 626629781a53
line wrap: on
line diff
--- a/sat/plugins/plugin_xep_0420.py	Thu Sep 22 12:03:12 2022 +0200
+++ b/sat/plugins/plugin_xep_0420.py	Tue Aug 23 21:06:24 2022 +0200
@@ -16,15 +16,12 @@
 # You should have received a copy of the GNU Affero General Public License
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
-# Type-check with `mypy --strict --disable-error-code no-untyped-call`
-# Lint with `pylint`
-
 from abc import ABC, abstractmethod
 from datetime import datetime
 import enum
 import secrets
 import string
-from typing import Dict, Iterator, List, NamedTuple, Optional, Set, Tuple, Union, cast
+from typing import Dict, NamedTuple, Optional, Set, Tuple, cast
 
 from lxml import etree
 
@@ -55,7 +52,7 @@
 ]
 
 
-log = cast(Logger, getLogger(__name__))
+log = cast(Logger, getLogger(__name__))  # type: ignore[no-untyped-call]
 
 
 PLUGIN_INFO = {
@@ -291,19 +288,15 @@
         # Note the serialized byte size of the content element before adding any children
         empty_content_byte_size = len(content.toXml().encode("utf-8"))
 
-        # Just for type safety
-        stanza_children = cast(List[Union[domish.Element, str]], stanza.children)
-        content_children = cast(List[Union[domish.Element, str]], content.children)
-
         # Move elements that are not explicitly forbidden from being encrypted from the
         # stanza to the content element.
-        for child in list(cast(Iterator[domish.Element], stanza.elements())):
+        for child in list(stanza.elements()):
             if (
                 child.uri not in XEP_0420.MUST_BE_PLAINTEXT_NAMESPACES
                 and (child.uri, child.name) not in XEP_0420.MUST_BE_PLAINTEXT_ELEMENTS
             ):
                 # Remove the child from the stanza
-                stanza_children.remove(child)
+                stanza.children.remove(child)
 
                 # A namespace of ``None`` can be used on domish elements to inherit the
                 # namespace from the parent. When moving elements from the stanza root to
@@ -316,7 +309,7 @@
                     child.defaultUri = C.NS_CLIENT
 
                 # Add the child with corrected namespaces to the content element
-                content_children.append(child)
+                content.addChild(child)
 
         # Add the affixes requested by the profile
         if profile.rpad_policy is not SCEAffixPolicy.NOT_NEEDED:
@@ -345,7 +338,7 @@
             time_element["stamp"] = XEP_0082.format_datetime()
 
         if profile.to_policy is not SCEAffixPolicy.NOT_NEEDED:
-            recipient = cast(Optional[str], stanza.getAttribute("to", None))
+            recipient = stanza.getAttribute("to", None)
             if recipient is None:
                 raise ValueError(
                     "<to/> affix requested, but stanza doesn't have the 'to' attribute"
@@ -356,7 +349,7 @@
             to_element["jid"] = jid.JID(recipient).userhost()
 
         if profile.from_policy is not SCEAffixPolicy.NOT_NEEDED:
-            sender = cast(Optional[str], stanza.getAttribute("from", None))
+            sender = stanza.getAttribute("from", None)
             if sender is None:
                 raise ValueError(
                     "<from/> affix requested, but stanza doesn't have the 'from'"
@@ -370,7 +363,7 @@
             if policy is not SCEAffixPolicy.NOT_NEEDED:
                 envelope.addChild(affix.create(stanza))
 
-        return cast(str, envelope.toXml()).encode("utf-8")
+        return envelope.toXml().encode("utf-8")
 
     @staticmethod
     def unpack_stanza(
@@ -431,7 +424,7 @@
 
         # Prepare the envelope and content elements
         envelope = cast(domish.Element, ElementParser()(envelope_serialized_string))
-        content = cast(domish.Element, next(envelope.elements(NS_SCE, "content")))
+        content = next(envelope.elements(NS_SCE, "content"))
 
         # Verify the affixes
         rpad_element = cast(
@@ -468,7 +461,7 @@
         if to_element is not None:
             recipient_value = jid.JID(to_element["jid"])
 
-            recipient_actual = cast(Optional[str], stanza.getAttribute("to", None))
+            recipient_actual = stanza.getAttribute("to", None)
             if recipient_actual is None:
                 raise AffixVerificationFailed(
                     "'To' affix is included in the envelope, but the stanza is lacking a"
@@ -491,7 +484,7 @@
         if from_element is not None:
             sender_value = jid.JID(from_element["jid"])
 
-            sender_actual = cast(Optional[str], stanza.getAttribute("from", None))
+            sender_actual = stanza.getAttribute("from", None)
             if sender_actual is None:
                 raise AffixVerificationFailed(
                     "'From' affix is included in the envelope, but the stanza is lacking"
@@ -551,13 +544,9 @@
                 + custom_missing_string
             )
 
-        # Just for type safety
-        content_children = cast(List[Union[domish.Element, str]], content.children)
-        stanza_children = cast(List[Union[domish.Element, str]], stanza.children)
-
         # Move elements that are not explicitly forbidden from being encrypted from the
         # content element to the stanza.
-        for child in list(cast(Iterator[domish.Element], content.elements())):
+        for child in list(content.elements()):
             if (
                 child.uri in XEP_0420.MUST_BE_PLAINTEXT_NAMESPACES
                 or (child.uri, child.name) in XEP_0420.MUST_BE_PLAINTEXT_ELEMENTS
@@ -568,10 +557,10 @@
                 )
             else:
                 # Remove the child from the content element
-                content_children.remove(child)
+                content.children.remove(child)
 
                 # Add the child to the stanza
-                stanza_children.append(child)
+                stanza.addChild(child)
 
         return SCEAffixValues(
             rpad_value,