Mercurial > libervia-backend
comparison libervia/backend/plugins/plugin_xep_0374.py @ 4071:4b842c1fb686
refactoring: renamed `sat` package to `libervia.backend`
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 02 Jun 2023 11:49:51 +0200 |
parents | sat/plugins/plugin_xep_0374.py@c23cad65ae99 |
children | 040095a5dc7f |
comparison
equal
deleted
inserted
replaced
4070:d10748475025 | 4071:4b842c1fb686 |
---|---|
1 #!/usr/bin/env python3 | |
2 | |
3 # Libervia plugin for OpenPGP for XMPP Instant Messaging | |
4 # Copyright (C) 2022-2022 Tim Henkes (me@syndace.dev) | |
5 | |
6 # This program is free software: you can redistribute it and/or modify | |
7 # it under the terms of the GNU Affero General Public License as published by | |
8 # the Free Software Foundation, either version 3 of the License, or | |
9 # (at your option) any later version. | |
10 | |
11 # This program is distributed in the hope that it will be useful, | |
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
14 # GNU Affero General Public License for more details. | |
15 | |
16 # You should have received a copy of the GNU Affero General Public License | |
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
18 | |
19 from typing import Dict, Optional, Set, cast | |
20 | |
21 from typing_extensions import Final | |
22 from wokkel import muc # type: ignore[import] | |
23 | |
24 from libervia.backend.core import exceptions | |
25 from libervia.backend.core.constants import Const as C | |
26 from libervia.backend.core.core_types import SatXMPPEntity | |
27 from libervia.backend.core.i18n import _, D_ | |
28 from libervia.backend.core.log import getLogger, Logger | |
29 from libervia.backend.core.sat_main import SAT | |
30 from libervia.backend.core.xmpp import SatXMPPClient | |
31 from libervia.backend.plugins.plugin_xep_0045 import XEP_0045 | |
32 from libervia.backend.plugins.plugin_xep_0334 import XEP_0334 | |
33 from libervia.backend.plugins.plugin_xep_0373 import NS_OX, XEP_0373, TrustLevel | |
34 from libervia.backend.tools import xml_tools | |
35 from twisted.internet import defer | |
36 from twisted.words.protocols.jabber import jid | |
37 from twisted.words.xish import domish | |
38 | |
39 | |
40 __all__ = [ # pylint: disable=unused-variable | |
41 "PLUGIN_INFO", | |
42 "XEP_0374", | |
43 "NS_OXIM" | |
44 ] | |
45 | |
46 | |
47 log = cast(Logger, getLogger(__name__)) # type: ignore[no-untyped-call] | |
48 | |
49 | |
50 PLUGIN_INFO = { | |
51 C.PI_NAME: "OXIM", | |
52 C.PI_IMPORT_NAME: "XEP-0374", | |
53 C.PI_TYPE: "SEC", | |
54 C.PI_PROTOCOLS: [ "XEP-0374" ], | |
55 C.PI_DEPENDENCIES: [ "XEP-0334", "XEP-0373" ], | |
56 C.PI_RECOMMENDATIONS: [ "XEP-0045" ], | |
57 C.PI_MAIN: "XEP_0374", | |
58 C.PI_HANDLER: "no", | |
59 C.PI_DESCRIPTION: _("""Implementation of OXIM"""), | |
60 } | |
61 | |
62 | |
63 # The disco feature | |
64 NS_OXIM: Final = "urn:xmpp:openpgp:im:0" | |
65 | |
66 | |
67 class XEP_0374: | |
68 """ | |
69 Plugin equipping Libervia with OXIM capabilities under the ``urn:xmpp:openpgp:im:0`` | |
70 namespace. MUC messages are supported next to one to one messages. For trust | |
71 management, the two trust models "BTBV" and "manual" are supported. | |
72 """ | |
73 | |
74 def __init__(self, sat: SAT) -> None: | |
75 """ | |
76 @param sat: The SAT instance. | |
77 """ | |
78 | |
79 self.__sat = sat | |
80 | |
81 # Plugins | |
82 self.__xep_0045 = cast(Optional[XEP_0045], sat.plugins.get("XEP-0045")) | |
83 self.__xep_0334 = cast(XEP_0334, sat.plugins["XEP-0334"]) | |
84 self.__xep_0373 = cast(XEP_0373, sat.plugins["XEP-0373"]) | |
85 | |
86 # Triggers | |
87 sat.trigger.add( | |
88 "message_received", | |
89 self.__message_received_trigger, | |
90 priority=100050 | |
91 ) | |
92 sat.trigger.add("send", self.__send_trigger, priority=0) | |
93 | |
94 # Register the encryption plugin | |
95 sat.register_encryption_plugin(self, "OXIM", NS_OX, 102) | |
96 | |
97 async def get_trust_ui( # pylint: disable=invalid-name | |
98 self, | |
99 client: SatXMPPClient, | |
100 entity: jid.JID | |
101 ) -> xml_tools.XMLUI: | |
102 """ | |
103 @param client: The client. | |
104 @param entity: The entity whose device trust levels to manage. | |
105 @return: An XMLUI instance which opens a form to manage the trust level of all | |
106 devices belonging to the entity. | |
107 """ | |
108 | |
109 return await self.__xep_0373.get_trust_ui(client, entity) | |
110 | |
111 @staticmethod | |
112 def __get_joined_muc_users( | |
113 client: SatXMPPClient, | |
114 xep_0045: XEP_0045, | |
115 room_jid: jid.JID | |
116 ) -> Set[jid.JID]: | |
117 """ | |
118 @param client: The client. | |
119 @param xep_0045: A MUC plugin instance. | |
120 @param room_jid: The room JID. | |
121 @return: A set containing the bare JIDs of the MUC participants. | |
122 @raise InternalError: if the MUC is not joined or the entity information of a | |
123 participant isn't available. | |
124 """ | |
125 | |
126 bare_jids: Set[jid.JID] = set() | |
127 | |
128 try: | |
129 room = cast(muc.Room, xep_0045.get_room(client, room_jid)) | |
130 except exceptions.NotFound as e: | |
131 raise exceptions.InternalError( | |
132 "Participant list of unjoined MUC requested." | |
133 ) from e | |
134 | |
135 for user in cast(Dict[str, muc.User], room.roster).values(): | |
136 entity = cast(Optional[SatXMPPEntity], user.entity) | |
137 if entity is None: | |
138 raise exceptions.InternalError( | |
139 f"Participant list of MUC requested, but the entity information of" | |
140 f" the participant {user} is not available." | |
141 ) | |
142 | |
143 bare_jids.add(entity.jid.userhostJID()) | |
144 | |
145 return bare_jids | |
146 | |
147 async def __message_received_trigger( | |
148 self, | |
149 client: SatXMPPClient, | |
150 message_elt: domish.Element, | |
151 post_treat: defer.Deferred | |
152 ) -> bool: | |
153 """ | |
154 @param client: The client which received the message. | |
155 @param message_elt: The message element. Can be modified. | |
156 @param post_treat: A deferred which evaluates to a :class:`MessageData` once the | |
157 message has fully progressed through the message receiving flow. Can be used | |
158 to apply treatments to the fully processed message, like marking it as | |
159 encrypted. | |
160 @return: Whether to continue the message received flow. | |
161 """ | |
162 sender_jid = jid.JID(message_elt["from"]) | |
163 feedback_jid: jid.JID | |
164 | |
165 message_type = message_elt.getAttribute("type", "unknown") | |
166 is_muc_message = message_type == C.MESS_TYPE_GROUPCHAT | |
167 if is_muc_message: | |
168 if self.__xep_0045 is None: | |
169 log.warning( | |
170 "Ignoring MUC message since plugin XEP-0045 is not available." | |
171 ) | |
172 # Can't handle a MUC message without XEP-0045, let the flow continue | |
173 # normally | |
174 return True | |
175 | |
176 room_jid = feedback_jid = sender_jid.userhostJID() | |
177 | |
178 try: | |
179 room = cast(muc.Room, self.__xep_0045.get_room(client, room_jid)) | |
180 except exceptions.NotFound: | |
181 log.warning( | |
182 f"Ignoring MUC message from a room that has not been joined:" | |
183 f" {room_jid}" | |
184 ) | |
185 # Whatever, let the flow continue | |
186 return True | |
187 | |
188 sender_user = cast(Optional[muc.User], room.getUser(sender_jid.resource)) | |
189 if sender_user is None: | |
190 log.warning( | |
191 f"Ignoring MUC message from room {room_jid} since the sender's user" | |
192 f" wasn't found {sender_jid.resource}" | |
193 ) | |
194 # Whatever, let the flow continue | |
195 return True | |
196 | |
197 sender_user_jid = cast(Optional[jid.JID], sender_user.entity) | |
198 if sender_user_jid is None: | |
199 log.warning( | |
200 f"Ignoring MUC message from room {room_jid} since the sender's bare" | |
201 f" JID couldn't be found from its user information: {sender_user}" | |
202 ) | |
203 # Whatever, let the flow continue | |
204 return True | |
205 | |
206 sender_jid = sender_user_jid | |
207 else: | |
208 # I'm not sure why this check is required, this code is copied from XEP-0384 | |
209 if sender_jid.userhostJID() == client.jid.userhostJID(): | |
210 try: | |
211 feedback_jid = jid.JID(message_elt["to"]) | |
212 except KeyError: | |
213 feedback_jid = client.server_jid | |
214 else: | |
215 feedback_jid = sender_jid | |
216 | |
217 sender_bare_jid = sender_jid.userhost() | |
218 | |
219 openpgp_elt = cast(Optional[domish.Element], next( | |
220 message_elt.elements(NS_OX, "openpgp"), | |
221 None | |
222 )) | |
223 | |
224 if openpgp_elt is None: | |
225 # None of our business, let the flow continue | |
226 return True | |
227 | |
228 try: | |
229 payload_elt, timestamp = await self.__xep_0373.unpack_openpgp_element( | |
230 client, | |
231 openpgp_elt, | |
232 "signcrypt", | |
233 jid.JID(sender_bare_jid) | |
234 ) | |
235 except Exception as e: | |
236 # TODO: More specific exception handling | |
237 log.warning(_("Can't decrypt message: {reason}\n{xml}").format( | |
238 reason=e, | |
239 xml=message_elt.toXml() | |
240 )) | |
241 client.feedback( | |
242 feedback_jid, | |
243 D_( | |
244 f"An OXIM message from {sender_jid.full()} can't be decrypted:" | |
245 f" {e}" | |
246 ), | |
247 { C.MESS_EXTRA_INFO: C.EXTRA_INFO_DECR_ERR } | |
248 ) | |
249 # No point in further processing this message | |
250 return False | |
251 | |
252 message_elt.children.remove(openpgp_elt) | |
253 | |
254 log.debug(f"OXIM message of type {message_type} received from {sender_bare_jid}") | |
255 | |
256 # Remove all body elements from the original element, since those act as | |
257 # fallbacks in case the encryption protocol is not supported | |
258 for child in message_elt.elements(): | |
259 if child.name == "body": | |
260 message_elt.children.remove(child) | |
261 | |
262 # Move all extension elements from the payload to the stanza root | |
263 # TODO: There should probably be explicitly forbidden elements here too, just as | |
264 # for XEP-0420 | |
265 for child in list(payload_elt.elements()): | |
266 # Remove the child from the content element | |
267 payload_elt.children.remove(child) | |
268 | |
269 # Add the child to the stanza | |
270 message_elt.addChild(child) | |
271 | |
272 # Mark the message as trusted or untrusted. Undecided counts as untrusted here. | |
273 trust_level = TrustLevel.UNDECIDED # TODO: Load the actual trust level | |
274 if trust_level is TrustLevel.TRUSTED: | |
275 post_treat.addCallback(client.encryption.mark_as_trusted) | |
276 else: | |
277 post_treat.addCallback(client.encryption.mark_as_untrusted) | |
278 | |
279 # Mark the message as originally encrypted | |
280 post_treat.addCallback( | |
281 client.encryption.mark_as_encrypted, | |
282 namespace=NS_OX | |
283 ) | |
284 | |
285 # Message processed successfully, continue with the flow | |
286 return True | |
287 | |
288 async def __send_trigger(self, client: SatXMPPClient, stanza: domish.Element) -> bool: | |
289 """ | |
290 @param client: The client sending this message. | |
291 @param stanza: The stanza that is about to be sent. Can be modified. | |
292 @return: Whether the send message flow should continue or not. | |
293 """ | |
294 # OXIM only handles message stanzas | |
295 if stanza.name != "message": | |
296 return True | |
297 | |
298 # Get the intended recipient | |
299 recipient = stanza.getAttribute("to", None) | |
300 if recipient is None: | |
301 raise exceptions.InternalError( | |
302 f"Message without recipient encountered. Blocking further processing to" | |
303 f" avoid leaking plaintext data: {stanza.toXml()}" | |
304 ) | |
305 | |
306 # Parse the JID | |
307 recipient_bare_jid = jid.JID(recipient).userhostJID() | |
308 | |
309 # Check whether encryption with OXIM is requested | |
310 encryption = client.encryption.getSession(recipient_bare_jid) | |
311 | |
312 if encryption is None: | |
313 # Encryption is not requested for this recipient | |
314 return True | |
315 | |
316 if encryption["plugin"].namespace != NS_OX: | |
317 # Encryption is requested for this recipient, but not with OXIM | |
318 return True | |
319 | |
320 # All pre-checks done, we can start encrypting! | |
321 await self.__encrypt( | |
322 client, | |
323 stanza, | |
324 recipient_bare_jid, | |
325 stanza.getAttribute("type", "unkown") == C.MESS_TYPE_GROUPCHAT | |
326 ) | |
327 | |
328 # Add a store hint if this is a message stanza | |
329 self.__xep_0334.add_hint_elements(stanza, [ "store" ]) | |
330 | |
331 # Let the flow continue. | |
332 return True | |
333 | |
334 async def __encrypt( | |
335 self, | |
336 client: SatXMPPClient, | |
337 stanza: domish.Element, | |
338 recipient_jid: jid.JID, | |
339 is_muc_message: bool | |
340 ) -> None: | |
341 """ | |
342 @param client: The client. | |
343 @param stanza: The stanza, which is modified by this call. | |
344 @param recipient_jid: The JID of the recipient. Can be a bare (aka "userhost") JID | |
345 but doesn't have to. | |
346 @param is_muc_message: Whether the stanza is a message stanza to a MUC room. | |
347 | |
348 @warning: The calling code MUST take care of adding the store message processing | |
349 hint to the stanza if applicable! This can be done before or after this call, | |
350 the order doesn't matter. | |
351 """ | |
352 | |
353 recipient_bare_jids: Set[jid.JID] | |
354 feedback_jid: jid.JID | |
355 | |
356 if is_muc_message: | |
357 if self.__xep_0045 is None: | |
358 raise exceptions.InternalError( | |
359 "Encryption of MUC message requested, but plugin XEP-0045 is not" | |
360 " available." | |
361 ) | |
362 | |
363 room_jid = feedback_jid = recipient_jid.userhostJID() | |
364 | |
365 recipient_bare_jids = self.__get_joined_muc_users( | |
366 client, | |
367 self.__xep_0045, | |
368 room_jid | |
369 ) | |
370 else: | |
371 recipient_bare_jids = { recipient_jid.userhostJID() } | |
372 feedback_jid = recipient_jid.userhostJID() | |
373 | |
374 log.debug( | |
375 f"Intercepting message that is to be encrypted by {NS_OX} for" | |
376 f" {recipient_bare_jids}" | |
377 ) | |
378 | |
379 signcrypt_elt, payload_elt = \ | |
380 self.__xep_0373.build_signcrypt_element(recipient_bare_jids) | |
381 | |
382 # Move elements from the stanza to the content element. | |
383 # TODO: There should probably be explicitly forbidden elements here too, just as | |
384 # for XEP-0420 | |
385 for child in list(stanza.elements()): | |
386 if child.name == "openpgp" and child.uri == NS_OX: | |
387 log.debug("not re-encrypting encrypted OX element") | |
388 continue | |
389 # Remove the child from the stanza | |
390 stanza.children.remove(child) | |
391 | |
392 # A namespace of ``None`` can be used on domish elements to inherit the | |
393 # namespace from the parent. When moving elements from the stanza root to | |
394 # the content element, however, we don't want elements to inherit the | |
395 # namespace of the content element. Thus, check for elements with ``None`` | |
396 # for their namespace and set the namespace to jabber:client, which is the | |
397 # namespace of the parent element. | |
398 if child.uri is None: | |
399 child.uri = C.NS_CLIENT | |
400 child.defaultUri = C.NS_CLIENT | |
401 | |
402 # Add the child with corrected namespaces to the content element | |
403 payload_elt.addChild(child) | |
404 | |
405 try: | |
406 openpgp_elt = await self.__xep_0373.build_openpgp_element( | |
407 client, | |
408 signcrypt_elt, | |
409 recipient_bare_jids | |
410 ) | |
411 except Exception as e: | |
412 msg = _( | |
413 # pylint: disable=consider-using-f-string | |
414 "Can't encrypt message for {entities}: {reason}".format( | |
415 entities=', '.join(jid.userhost() for jid in recipient_bare_jids), | |
416 reason=e | |
417 ) | |
418 ) | |
419 log.warning(msg) | |
420 client.feedback(feedback_jid, msg, { | |
421 C.MESS_EXTRA_INFO: C.EXTRA_INFO_ENCR_ERR | |
422 }) | |
423 raise e | |
424 | |
425 stanza.addChild(openpgp_elt) |