Mercurial > libervia-backend
comparison libervia/backend/plugins/plugin_xep_0374.py @ 4270:0d7bb4df2343
Reformatted code base using black.
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 19 Jun 2024 18:44:57 +0200 |
parents | 7c5654c54fed |
children |
comparison
equal
deleted
inserted
replaced
4269:64a85ce8be70 | 4270:0d7bb4df2343 |
---|---|
35 from twisted.internet import defer | 35 from twisted.internet import defer |
36 from twisted.words.protocols.jabber import jid | 36 from twisted.words.protocols.jabber import jid |
37 from twisted.words.xish import domish | 37 from twisted.words.xish import domish |
38 | 38 |
39 | 39 |
40 __all__ = [ # pylint: disable=unused-variable | 40 __all__ = ["PLUGIN_INFO", "XEP_0374", "NS_OXIM"] # pylint: disable=unused-variable |
41 "PLUGIN_INFO", | |
42 "XEP_0374", | |
43 "NS_OXIM" | |
44 ] | |
45 | 41 |
46 | 42 |
47 log = cast(Logger, getLogger(__name__)) # type: ignore[no-untyped-call] | 43 log = cast(Logger, getLogger(__name__)) # type: ignore[no-untyped-call] |
48 | 44 |
49 | 45 |
50 PLUGIN_INFO = { | 46 PLUGIN_INFO = { |
51 C.PI_NAME: "OXIM", | 47 C.PI_NAME: "OXIM", |
52 C.PI_IMPORT_NAME: "XEP-0374", | 48 C.PI_IMPORT_NAME: "XEP-0374", |
53 C.PI_TYPE: "SEC", | 49 C.PI_TYPE: "SEC", |
54 C.PI_PROTOCOLS: [ "XEP-0374" ], | 50 C.PI_PROTOCOLS: ["XEP-0374"], |
55 C.PI_DEPENDENCIES: [ "XEP-0334", "XEP-0373" ], | 51 C.PI_DEPENDENCIES: ["XEP-0334", "XEP-0373"], |
56 C.PI_RECOMMENDATIONS: [ "XEP-0045" ], | 52 C.PI_RECOMMENDATIONS: ["XEP-0045"], |
57 C.PI_MAIN: "XEP_0374", | 53 C.PI_MAIN: "XEP_0374", |
58 C.PI_HANDLER: "no", | 54 C.PI_HANDLER: "no", |
59 C.PI_DESCRIPTION: _("""Implementation of OXIM"""), | 55 C.PI_DESCRIPTION: _("""Implementation of OXIM"""), |
60 } | 56 } |
61 | 57 |
83 self.__xep_0334 = cast(XEP_0334, sat.plugins["XEP-0334"]) | 79 self.__xep_0334 = cast(XEP_0334, sat.plugins["XEP-0334"]) |
84 self.__xep_0373 = cast(XEP_0373, sat.plugins["XEP-0373"]) | 80 self.__xep_0373 = cast(XEP_0373, sat.plugins["XEP-0373"]) |
85 | 81 |
86 # Triggers | 82 # Triggers |
87 sat.trigger.add( | 83 sat.trigger.add( |
88 "message_received", | 84 "message_received", self.__message_received_trigger, priority=100050 |
89 self.__message_received_trigger, | |
90 priority=100050 | |
91 ) | 85 ) |
92 sat.trigger.add("send", self.__send_trigger, priority=0) | 86 sat.trigger.add("send", self.__send_trigger, priority=0) |
93 | 87 |
94 # Register the encryption plugin | 88 # Register the encryption plugin |
95 sat.register_encryption_plugin(self, "OXIM", NS_OX, 102) | 89 sat.register_encryption_plugin(self, "OXIM", NS_OX, 102) |
96 | 90 |
97 async def get_trust_ui( # pylint: disable=invalid-name | 91 async def get_trust_ui( # pylint: disable=invalid-name |
98 self, | 92 self, client: SatXMPPClient, entity: jid.JID |
99 client: SatXMPPClient, | |
100 entity: jid.JID | |
101 ) -> xml_tools.XMLUI: | 93 ) -> xml_tools.XMLUI: |
102 """ | 94 """ |
103 @param client: The client. | 95 @param client: The client. |
104 @param entity: The entity whose device trust levels to manage. | 96 @param entity: The entity whose device trust levels to manage. |
105 @return: An XMLUI instance which opens a form to manage the trust level of all | 97 @return: An XMLUI instance which opens a form to manage the trust level of all |
108 | 100 |
109 return await self.__xep_0373.get_trust_ui(client, entity) | 101 return await self.__xep_0373.get_trust_ui(client, entity) |
110 | 102 |
111 @staticmethod | 103 @staticmethod |
112 def __get_joined_muc_users( | 104 def __get_joined_muc_users( |
113 client: SatXMPPClient, | 105 client: SatXMPPClient, xep_0045: XEP_0045, room_jid: jid.JID |
114 xep_0045: XEP_0045, | |
115 room_jid: jid.JID | |
116 ) -> Set[jid.JID]: | 106 ) -> Set[jid.JID]: |
117 """ | 107 """ |
118 @param client: The client. | 108 @param client: The client. |
119 @param xep_0045: A MUC plugin instance. | 109 @param xep_0045: A MUC plugin instance. |
120 @param room_jid: The room JID. | 110 @param room_jid: The room JID. |
146 | 136 |
147 async def __message_received_trigger( | 137 async def __message_received_trigger( |
148 self, | 138 self, |
149 client: SatXMPPClient, | 139 client: SatXMPPClient, |
150 message_elt: domish.Element, | 140 message_elt: domish.Element, |
151 post_treat: defer.Deferred | 141 post_treat: defer.Deferred, |
152 ) -> bool: | 142 ) -> bool: |
153 """ | 143 """ |
154 @param client: The client which received the message. | 144 @param client: The client which received the message. |
155 @param message_elt: The message element. Can be modified. | 145 @param message_elt: The message element. Can be modified. |
156 @param post_treat: A deferred which evaluates to a :class:`MessageData` once the | 146 @param post_treat: A deferred which evaluates to a :class:`MessageData` once the |
214 else: | 204 else: |
215 feedback_jid = sender_jid | 205 feedback_jid = sender_jid |
216 | 206 |
217 sender_bare_jid = sender_jid.userhost() | 207 sender_bare_jid = sender_jid.userhost() |
218 | 208 |
219 openpgp_elt = cast(Optional[domish.Element], next( | 209 openpgp_elt = cast( |
220 message_elt.elements(NS_OX, "openpgp"), | 210 Optional[domish.Element], next(message_elt.elements(NS_OX, "openpgp"), None) |
221 None | 211 ) |
222 )) | |
223 | 212 |
224 if openpgp_elt is None: | 213 if openpgp_elt is None: |
225 # None of our business, let the flow continue | 214 # None of our business, let the flow continue |
226 return True | 215 return True |
227 | 216 |
228 try: | 217 try: |
229 payload_elt, timestamp = await self.__xep_0373.unpack_openpgp_element( | 218 payload_elt, timestamp = await self.__xep_0373.unpack_openpgp_element( |
230 client, | 219 client, openpgp_elt, "signcrypt", jid.JID(sender_bare_jid) |
231 openpgp_elt, | |
232 "signcrypt", | |
233 jid.JID(sender_bare_jid) | |
234 ) | 220 ) |
235 except Exception as e: | 221 except Exception as e: |
236 # TODO: More specific exception handling | 222 # TODO: More specific exception handling |
237 log.warning(_("Can't decrypt message: {reason}\n{xml}").format( | 223 log.warning( |
238 reason=e, | 224 _("Can't decrypt message: {reason}\n{xml}").format( |
239 xml=message_elt.toXml() | 225 reason=e, xml=message_elt.toXml() |
240 )) | 226 ) |
227 ) | |
241 client.feedback( | 228 client.feedback( |
242 feedback_jid, | 229 feedback_jid, |
243 D_( | 230 D_( |
244 f"An OXIM message from {sender_jid.full()} can't be decrypted:" | 231 f"An OXIM message from {sender_jid.full()} can't be decrypted:" |
245 f" {e}" | 232 f" {e}" |
246 ), | 233 ), |
247 { C.MESS_EXTRA_INFO: C.EXTRA_INFO_DECR_ERR } | 234 {C.MESS_EXTRA_INFO: C.EXTRA_INFO_DECR_ERR}, |
248 ) | 235 ) |
249 # No point in further processing this message | 236 # No point in further processing this message |
250 return False | 237 return False |
251 | 238 |
252 message_elt.children.remove(openpgp_elt) | 239 message_elt.children.remove(openpgp_elt) |
275 post_treat.addCallback(client.encryption.mark_as_trusted) | 262 post_treat.addCallback(client.encryption.mark_as_trusted) |
276 else: | 263 else: |
277 post_treat.addCallback(client.encryption.mark_as_untrusted) | 264 post_treat.addCallback(client.encryption.mark_as_untrusted) |
278 | 265 |
279 # Mark the message as originally encrypted | 266 # Mark the message as originally encrypted |
280 post_treat.addCallback( | 267 post_treat.addCallback(client.encryption.mark_as_encrypted, namespace=NS_OX) |
281 client.encryption.mark_as_encrypted, | |
282 namespace=NS_OX | |
283 ) | |
284 | 268 |
285 # Message processed successfully, continue with the flow | 269 # Message processed successfully, continue with the flow |
286 return True | 270 return True |
287 | 271 |
288 async def __send_trigger(self, client: SatXMPPClient, stanza: domish.Element) -> bool: | 272 async def __send_trigger(self, client: SatXMPPClient, stanza: domish.Element) -> bool: |
320 # All pre-checks done, we can start encrypting! | 304 # All pre-checks done, we can start encrypting! |
321 await self.__encrypt( | 305 await self.__encrypt( |
322 client, | 306 client, |
323 stanza, | 307 stanza, |
324 recipient_bare_jid, | 308 recipient_bare_jid, |
325 stanza.getAttribute("type", "unkown") == C.MESS_TYPE_GROUPCHAT | 309 stanza.getAttribute("type", "unkown") == C.MESS_TYPE_GROUPCHAT, |
326 ) | 310 ) |
327 | 311 |
328 # Add a store hint if this is a message stanza | 312 # Add a store hint if this is a message stanza |
329 self.__xep_0334.add_hint_elements(stanza, [ "store" ]) | 313 self.__xep_0334.add_hint_elements(stanza, ["store"]) |
330 | 314 |
331 # Let the flow continue. | 315 # Let the flow continue. |
332 return True | 316 return True |
333 | 317 |
334 async def __encrypt( | 318 async def __encrypt( |
335 self, | 319 self, |
336 client: SatXMPPClient, | 320 client: SatXMPPClient, |
337 stanza: domish.Element, | 321 stanza: domish.Element, |
338 recipient_jid: jid.JID, | 322 recipient_jid: jid.JID, |
339 is_muc_message: bool | 323 is_muc_message: bool, |
340 ) -> None: | 324 ) -> None: |
341 """ | 325 """ |
342 @param client: The client. | 326 @param client: The client. |
343 @param stanza: The stanza, which is modified by this call. | 327 @param stanza: The stanza, which is modified by this call. |
344 @param recipient_jid: The JID of the recipient. Can be a bare (aka "userhost") JID | 328 @param recipient_jid: The JID of the recipient. Can be a bare (aka "userhost") JID |
361 ) | 345 ) |
362 | 346 |
363 room_jid = feedback_jid = recipient_jid.userhostJID() | 347 room_jid = feedback_jid = recipient_jid.userhostJID() |
364 | 348 |
365 recipient_bare_jids = self.__get_joined_muc_users( | 349 recipient_bare_jids = self.__get_joined_muc_users( |
366 client, | 350 client, self.__xep_0045, room_jid |
367 self.__xep_0045, | |
368 room_jid | |
369 ) | 351 ) |
370 else: | 352 else: |
371 recipient_bare_jids = { recipient_jid.userhostJID() } | 353 recipient_bare_jids = {recipient_jid.userhostJID()} |
372 feedback_jid = recipient_jid.userhostJID() | 354 feedback_jid = recipient_jid.userhostJID() |
373 | 355 |
374 log.debug( | 356 log.debug( |
375 f"Intercepting message that is to be encrypted by {NS_OX} for" | 357 f"Intercepting message that is to be encrypted by {NS_OX} for" |
376 f" {recipient_bare_jids}" | 358 f" {recipient_bare_jids}" |
377 ) | 359 ) |
378 | 360 |
379 signcrypt_elt, payload_elt = \ | 361 signcrypt_elt, payload_elt = self.__xep_0373.build_signcrypt_element( |
380 self.__xep_0373.build_signcrypt_element(recipient_bare_jids) | 362 recipient_bare_jids |
363 ) | |
381 | 364 |
382 # Move elements from the stanza to the content element. | 365 # Move elements from the stanza to the content element. |
383 # TODO: There should probably be explicitly forbidden elements here too, just as | 366 # TODO: There should probably be explicitly forbidden elements here too, just as |
384 # for XEP-0420 | 367 # for XEP-0420 |
385 for child in list(stanza.elements()): | 368 for child in list(stanza.elements()): |
402 # Add the child with corrected namespaces to the content element | 385 # Add the child with corrected namespaces to the content element |
403 payload_elt.addChild(child) | 386 payload_elt.addChild(child) |
404 | 387 |
405 try: | 388 try: |
406 openpgp_elt = await self.__xep_0373.build_openpgp_element( | 389 openpgp_elt = await self.__xep_0373.build_openpgp_element( |
407 client, | 390 client, signcrypt_elt, recipient_bare_jids |
408 signcrypt_elt, | |
409 recipient_bare_jids | |
410 ) | 391 ) |
411 except Exception as e: | 392 except Exception as e: |
412 msg = _( | 393 msg = _( |
413 # pylint: disable=consider-using-f-string | 394 # pylint: disable=consider-using-f-string |
414 "Can't encrypt message for {entities}: {reason}".format( | 395 "Can't encrypt message for {entities}: {reason}".format( |
415 entities=', '.join(jid.userhost() for jid in recipient_bare_jids), | 396 entities=", ".join(jid.userhost() for jid in recipient_bare_jids), |
416 reason=e | 397 reason=e, |
417 ) | 398 ) |
418 ) | 399 ) |
419 log.warning(msg) | 400 log.warning(msg) |
420 client.feedback(feedback_jid, msg, { | 401 client.feedback(feedback_jid, msg, {C.MESS_EXTRA_INFO: C.EXTRA_INFO_ENCR_ERR}) |
421 C.MESS_EXTRA_INFO: C.EXTRA_INFO_ENCR_ERR | |
422 }) | |
423 raise e | 402 raise e |
424 | 403 |
425 stanza.addChild(openpgp_elt) | 404 stanza.addChild(openpgp_elt) |