comparison libervia/backend/plugins/plugin_sec_pte.py @ 4270:0d7bb4df2343

Reformatted code base using black.
author Goffi <goffi@goffi.org>
date Wed, 19 Jun 2024 18:44:57 +0200
parents 4b842c1fb686
children
comparison
equal deleted inserted replaced
4269:64a85ce8be70 4270:0d7bb4df2343
70 service: jid.JID, 70 service: jid.JID,
71 node: str, 71 node: str,
72 items: Optional[List[domish.Element]], 72 items: Optional[List[domish.Element]],
73 options: Optional[dict], 73 options: Optional[dict],
74 sender: jid.JID, 74 sender: jid.JID,
75 extra: Dict[str, Any] 75 extra: Dict[str, Any],
76 ) -> bool: 76 ) -> bool:
77 if not items or extra.get("encrypted_for") is None: 77 if not items or extra.get("encrypted_for") is None:
78 return True 78 return True
79 encrypt_data = extra["encrypted_for"] 79 encrypt_data = extra["encrypted_for"]
80 try: 80 try:
93 client, 93 client,
94 self._o.NS_TWOMEMO, 94 self._o.NS_TWOMEMO,
95 item, 95 item,
96 targets, 96 targets,
97 is_muc_message=False, 97 is_muc_message=False,
98 stanza_id=None 98 stanza_id=None,
99 ) 99 )
100 item_elts = list(item.elements()) 100 item_elts = list(item.elements())
101 if len(item_elts) != 1: 101 if len(item_elts) != 1:
102 raise ValueError( 102 raise ValueError(
103 f"there should be exactly one item payload: {item.toXml()}" 103 f"there should be exactly one item payload: {item.toXml()}"
124 return True 124 return True
125 if service is None: 125 if service is None:
126 service = client.jid.userhostJID() 126 service = client.jid.userhostJID()
127 for item in items: 127 for item in items:
128 payload = item.firstChildElement() 128 payload = item.firstChildElement()
129 if (payload is not None 129 if (
130 payload is not None
130 and payload.name == "encrypted" 131 and payload.name == "encrypted"
131 and payload.uri == NS_PTE): 132 and payload.uri == NS_PTE
133 ):
132 encrypted_elt = payload 134 encrypted_elt = payload
133 item.children.clear() 135 item.children.clear()
134 try: 136 try:
135 encryption_type = encrypted_elt.getAttribute("type") 137 encryption_type = encrypted_elt.getAttribute("type")
136 encrypted_by = jid.JID(encrypted_elt["by"]) 138 encrypted_by = jid.JID(encrypted_elt["by"])
137 except (KeyError, RuntimeError): 139 except (KeyError, RuntimeError):
138 raise exceptions.DataError( 140 raise exceptions.DataError(
139 f"invalid <encrypted> element: {encrypted_elt.toXml()}" 141 f"invalid <encrypted> element: {encrypted_elt.toXml()}"
140 ) 142 )
141 if encryption_type!= self._o.NS_TWOMEMO: 143 if encryption_type != self._o.NS_TWOMEMO:
142 raise NotImplementedError("only TWOMEMO is supported for now") 144 raise NotImplementedError("only TWOMEMO is supported for now")
143 log.debug(f"decrypting item {item.getAttribute('id', '')}") 145 log.debug(f"decrypting item {item.getAttribute('id', '')}")
144 146
145 # FIXME: we do use _message_received_trigger now to decrypt the stanza, a 147 # FIXME: we do use _message_received_trigger now to decrypt the stanza, a
146 # cleaner separated decrypt method should be used 148 # cleaner separated decrypt method should be used
147 encrypted_elt["from"] = encrypted_by.full() 149 encrypted_elt["from"] = encrypted_by.full()
148 if not await self._o._message_received_trigger( 150 if (
149 client, 151 not await self._o._message_received_trigger(
150 encrypted_elt, 152 client, encrypted_elt, defer.Deferred()
151 defer.Deferred() 153 )
152 ) or not encrypted_elt.children: 154 or not encrypted_elt.children
155 ):
153 raise exceptions.EncryptionError("can't decrypt the message") 156 raise exceptions.EncryptionError("can't decrypt the message")
154 157
155 item.addChild(encrypted_elt.firstChildElement()) 158 item.addChild(encrypted_elt.firstChildElement())
156 159
157 extra.setdefault("encrypted", {})[item["id"]] = { 160 extra.setdefault("encrypted", {})[item["id"]] = {
158 "type": NS_PTE, 161 "type": NS_PTE,
159 "algorithm": encryption_type 162 "algorithm": encryption_type,
160 } 163 }
161 return True 164 return True
162 165
163 166
164 @implementer(iwokkel.IDisco) 167 @implementer(iwokkel.IDisco)