comparison libervia/backend/plugins/plugin_xep_0374.py @ 4071:4b842c1fb686

refactoring: renamed `sat` package to `libervia.backend`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 11:49:51 +0200
parents sat/plugins/plugin_xep_0374.py@c23cad65ae99
children 040095a5dc7f
comparison
equal deleted inserted replaced
4070:d10748475025 4071:4b842c1fb686
1 #!/usr/bin/env python3
2
3 # Libervia plugin for OpenPGP for XMPP Instant Messaging
4 # Copyright (C) 2022-2022 Tim Henkes (me@syndace.dev)
5
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU Affero General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
10
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU Affero General Public License for more details.
15
16 # You should have received a copy of the GNU Affero General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19 from typing import Dict, Optional, Set, cast
20
21 from typing_extensions import Final
22 from wokkel import muc # type: ignore[import]
23
24 from libervia.backend.core import exceptions
25 from libervia.backend.core.constants import Const as C
26 from libervia.backend.core.core_types import SatXMPPEntity
27 from libervia.backend.core.i18n import _, D_
28 from libervia.backend.core.log import getLogger, Logger
29 from libervia.backend.core.sat_main import SAT
30 from libervia.backend.core.xmpp import SatXMPPClient
31 from libervia.backend.plugins.plugin_xep_0045 import XEP_0045
32 from libervia.backend.plugins.plugin_xep_0334 import XEP_0334
33 from libervia.backend.plugins.plugin_xep_0373 import NS_OX, XEP_0373, TrustLevel
34 from libervia.backend.tools import xml_tools
35 from twisted.internet import defer
36 from twisted.words.protocols.jabber import jid
37 from twisted.words.xish import domish
38
39
40 __all__ = [ # pylint: disable=unused-variable
41 "PLUGIN_INFO",
42 "XEP_0374",
43 "NS_OXIM"
44 ]
45
46
47 log = cast(Logger, getLogger(__name__)) # type: ignore[no-untyped-call]
48
49
50 PLUGIN_INFO = {
51 C.PI_NAME: "OXIM",
52 C.PI_IMPORT_NAME: "XEP-0374",
53 C.PI_TYPE: "SEC",
54 C.PI_PROTOCOLS: [ "XEP-0374" ],
55 C.PI_DEPENDENCIES: [ "XEP-0334", "XEP-0373" ],
56 C.PI_RECOMMENDATIONS: [ "XEP-0045" ],
57 C.PI_MAIN: "XEP_0374",
58 C.PI_HANDLER: "no",
59 C.PI_DESCRIPTION: _("""Implementation of OXIM"""),
60 }
61
62
63 # The disco feature
64 NS_OXIM: Final = "urn:xmpp:openpgp:im:0"
65
66
67 class XEP_0374:
68 """
69 Plugin equipping Libervia with OXIM capabilities under the ``urn:xmpp:openpgp:im:0``
70 namespace. MUC messages are supported next to one to one messages. For trust
71 management, the two trust models "BTBV" and "manual" are supported.
72 """
73
74 def __init__(self, sat: SAT) -> None:
75 """
76 @param sat: The SAT instance.
77 """
78
79 self.__sat = sat
80
81 # Plugins
82 self.__xep_0045 = cast(Optional[XEP_0045], sat.plugins.get("XEP-0045"))
83 self.__xep_0334 = cast(XEP_0334, sat.plugins["XEP-0334"])
84 self.__xep_0373 = cast(XEP_0373, sat.plugins["XEP-0373"])
85
86 # Triggers
87 sat.trigger.add(
88 "message_received",
89 self.__message_received_trigger,
90 priority=100050
91 )
92 sat.trigger.add("send", self.__send_trigger, priority=0)
93
94 # Register the encryption plugin
95 sat.register_encryption_plugin(self, "OXIM", NS_OX, 102)
96
97 async def get_trust_ui( # pylint: disable=invalid-name
98 self,
99 client: SatXMPPClient,
100 entity: jid.JID
101 ) -> xml_tools.XMLUI:
102 """
103 @param client: The client.
104 @param entity: The entity whose device trust levels to manage.
105 @return: An XMLUI instance which opens a form to manage the trust level of all
106 devices belonging to the entity.
107 """
108
109 return await self.__xep_0373.get_trust_ui(client, entity)
110
111 @staticmethod
112 def __get_joined_muc_users(
113 client: SatXMPPClient,
114 xep_0045: XEP_0045,
115 room_jid: jid.JID
116 ) -> Set[jid.JID]:
117 """
118 @param client: The client.
119 @param xep_0045: A MUC plugin instance.
120 @param room_jid: The room JID.
121 @return: A set containing the bare JIDs of the MUC participants.
122 @raise InternalError: if the MUC is not joined or the entity information of a
123 participant isn't available.
124 """
125
126 bare_jids: Set[jid.JID] = set()
127
128 try:
129 room = cast(muc.Room, xep_0045.get_room(client, room_jid))
130 except exceptions.NotFound as e:
131 raise exceptions.InternalError(
132 "Participant list of unjoined MUC requested."
133 ) from e
134
135 for user in cast(Dict[str, muc.User], room.roster).values():
136 entity = cast(Optional[SatXMPPEntity], user.entity)
137 if entity is None:
138 raise exceptions.InternalError(
139 f"Participant list of MUC requested, but the entity information of"
140 f" the participant {user} is not available."
141 )
142
143 bare_jids.add(entity.jid.userhostJID())
144
145 return bare_jids
146
147 async def __message_received_trigger(
148 self,
149 client: SatXMPPClient,
150 message_elt: domish.Element,
151 post_treat: defer.Deferred
152 ) -> bool:
153 """
154 @param client: The client which received the message.
155 @param message_elt: The message element. Can be modified.
156 @param post_treat: A deferred which evaluates to a :class:`MessageData` once the
157 message has fully progressed through the message receiving flow. Can be used
158 to apply treatments to the fully processed message, like marking it as
159 encrypted.
160 @return: Whether to continue the message received flow.
161 """
162 sender_jid = jid.JID(message_elt["from"])
163 feedback_jid: jid.JID
164
165 message_type = message_elt.getAttribute("type", "unknown")
166 is_muc_message = message_type == C.MESS_TYPE_GROUPCHAT
167 if is_muc_message:
168 if self.__xep_0045 is None:
169 log.warning(
170 "Ignoring MUC message since plugin XEP-0045 is not available."
171 )
172 # Can't handle a MUC message without XEP-0045, let the flow continue
173 # normally
174 return True
175
176 room_jid = feedback_jid = sender_jid.userhostJID()
177
178 try:
179 room = cast(muc.Room, self.__xep_0045.get_room(client, room_jid))
180 except exceptions.NotFound:
181 log.warning(
182 f"Ignoring MUC message from a room that has not been joined:"
183 f" {room_jid}"
184 )
185 # Whatever, let the flow continue
186 return True
187
188 sender_user = cast(Optional[muc.User], room.getUser(sender_jid.resource))
189 if sender_user is None:
190 log.warning(
191 f"Ignoring MUC message from room {room_jid} since the sender's user"
192 f" wasn't found {sender_jid.resource}"
193 )
194 # Whatever, let the flow continue
195 return True
196
197 sender_user_jid = cast(Optional[jid.JID], sender_user.entity)
198 if sender_user_jid is None:
199 log.warning(
200 f"Ignoring MUC message from room {room_jid} since the sender's bare"
201 f" JID couldn't be found from its user information: {sender_user}"
202 )
203 # Whatever, let the flow continue
204 return True
205
206 sender_jid = sender_user_jid
207 else:
208 # I'm not sure why this check is required, this code is copied from XEP-0384
209 if sender_jid.userhostJID() == client.jid.userhostJID():
210 try:
211 feedback_jid = jid.JID(message_elt["to"])
212 except KeyError:
213 feedback_jid = client.server_jid
214 else:
215 feedback_jid = sender_jid
216
217 sender_bare_jid = sender_jid.userhost()
218
219 openpgp_elt = cast(Optional[domish.Element], next(
220 message_elt.elements(NS_OX, "openpgp"),
221 None
222 ))
223
224 if openpgp_elt is None:
225 # None of our business, let the flow continue
226 return True
227
228 try:
229 payload_elt, timestamp = await self.__xep_0373.unpack_openpgp_element(
230 client,
231 openpgp_elt,
232 "signcrypt",
233 jid.JID(sender_bare_jid)
234 )
235 except Exception as e:
236 # TODO: More specific exception handling
237 log.warning(_("Can't decrypt message: {reason}\n{xml}").format(
238 reason=e,
239 xml=message_elt.toXml()
240 ))
241 client.feedback(
242 feedback_jid,
243 D_(
244 f"An OXIM message from {sender_jid.full()} can't be decrypted:"
245 f" {e}"
246 ),
247 { C.MESS_EXTRA_INFO: C.EXTRA_INFO_DECR_ERR }
248 )
249 # No point in further processing this message
250 return False
251
252 message_elt.children.remove(openpgp_elt)
253
254 log.debug(f"OXIM message of type {message_type} received from {sender_bare_jid}")
255
256 # Remove all body elements from the original element, since those act as
257 # fallbacks in case the encryption protocol is not supported
258 for child in message_elt.elements():
259 if child.name == "body":
260 message_elt.children.remove(child)
261
262 # Move all extension elements from the payload to the stanza root
263 # TODO: There should probably be explicitly forbidden elements here too, just as
264 # for XEP-0420
265 for child in list(payload_elt.elements()):
266 # Remove the child from the content element
267 payload_elt.children.remove(child)
268
269 # Add the child to the stanza
270 message_elt.addChild(child)
271
272 # Mark the message as trusted or untrusted. Undecided counts as untrusted here.
273 trust_level = TrustLevel.UNDECIDED # TODO: Load the actual trust level
274 if trust_level is TrustLevel.TRUSTED:
275 post_treat.addCallback(client.encryption.mark_as_trusted)
276 else:
277 post_treat.addCallback(client.encryption.mark_as_untrusted)
278
279 # Mark the message as originally encrypted
280 post_treat.addCallback(
281 client.encryption.mark_as_encrypted,
282 namespace=NS_OX
283 )
284
285 # Message processed successfully, continue with the flow
286 return True
287
288 async def __send_trigger(self, client: SatXMPPClient, stanza: domish.Element) -> bool:
289 """
290 @param client: The client sending this message.
291 @param stanza: The stanza that is about to be sent. Can be modified.
292 @return: Whether the send message flow should continue or not.
293 """
294 # OXIM only handles message stanzas
295 if stanza.name != "message":
296 return True
297
298 # Get the intended recipient
299 recipient = stanza.getAttribute("to", None)
300 if recipient is None:
301 raise exceptions.InternalError(
302 f"Message without recipient encountered. Blocking further processing to"
303 f" avoid leaking plaintext data: {stanza.toXml()}"
304 )
305
306 # Parse the JID
307 recipient_bare_jid = jid.JID(recipient).userhostJID()
308
309 # Check whether encryption with OXIM is requested
310 encryption = client.encryption.getSession(recipient_bare_jid)
311
312 if encryption is None:
313 # Encryption is not requested for this recipient
314 return True
315
316 if encryption["plugin"].namespace != NS_OX:
317 # Encryption is requested for this recipient, but not with OXIM
318 return True
319
320 # All pre-checks done, we can start encrypting!
321 await self.__encrypt(
322 client,
323 stanza,
324 recipient_bare_jid,
325 stanza.getAttribute("type", "unkown") == C.MESS_TYPE_GROUPCHAT
326 )
327
328 # Add a store hint if this is a message stanza
329 self.__xep_0334.add_hint_elements(stanza, [ "store" ])
330
331 # Let the flow continue.
332 return True
333
334 async def __encrypt(
335 self,
336 client: SatXMPPClient,
337 stanza: domish.Element,
338 recipient_jid: jid.JID,
339 is_muc_message: bool
340 ) -> None:
341 """
342 @param client: The client.
343 @param stanza: The stanza, which is modified by this call.
344 @param recipient_jid: The JID of the recipient. Can be a bare (aka "userhost") JID
345 but doesn't have to.
346 @param is_muc_message: Whether the stanza is a message stanza to a MUC room.
347
348 @warning: The calling code MUST take care of adding the store message processing
349 hint to the stanza if applicable! This can be done before or after this call,
350 the order doesn't matter.
351 """
352
353 recipient_bare_jids: Set[jid.JID]
354 feedback_jid: jid.JID
355
356 if is_muc_message:
357 if self.__xep_0045 is None:
358 raise exceptions.InternalError(
359 "Encryption of MUC message requested, but plugin XEP-0045 is not"
360 " available."
361 )
362
363 room_jid = feedback_jid = recipient_jid.userhostJID()
364
365 recipient_bare_jids = self.__get_joined_muc_users(
366 client,
367 self.__xep_0045,
368 room_jid
369 )
370 else:
371 recipient_bare_jids = { recipient_jid.userhostJID() }
372 feedback_jid = recipient_jid.userhostJID()
373
374 log.debug(
375 f"Intercepting message that is to be encrypted by {NS_OX} for"
376 f" {recipient_bare_jids}"
377 )
378
379 signcrypt_elt, payload_elt = \
380 self.__xep_0373.build_signcrypt_element(recipient_bare_jids)
381
382 # Move elements from the stanza to the content element.
383 # TODO: There should probably be explicitly forbidden elements here too, just as
384 # for XEP-0420
385 for child in list(stanza.elements()):
386 if child.name == "openpgp" and child.uri == NS_OX:
387 log.debug("not re-encrypting encrypted OX element")
388 continue
389 # Remove the child from the stanza
390 stanza.children.remove(child)
391
392 # A namespace of ``None`` can be used on domish elements to inherit the
393 # namespace from the parent. When moving elements from the stanza root to
394 # the content element, however, we don't want elements to inherit the
395 # namespace of the content element. Thus, check for elements with ``None``
396 # for their namespace and set the namespace to jabber:client, which is the
397 # namespace of the parent element.
398 if child.uri is None:
399 child.uri = C.NS_CLIENT
400 child.defaultUri = C.NS_CLIENT
401
402 # Add the child with corrected namespaces to the content element
403 payload_elt.addChild(child)
404
405 try:
406 openpgp_elt = await self.__xep_0373.build_openpgp_element(
407 client,
408 signcrypt_elt,
409 recipient_bare_jids
410 )
411 except Exception as e:
412 msg = _(
413 # pylint: disable=consider-using-f-string
414 "Can't encrypt message for {entities}: {reason}".format(
415 entities=', '.join(jid.userhost() for jid in recipient_bare_jids),
416 reason=e
417 )
418 )
419 log.warning(msg)
420 client.feedback(feedback_jid, msg, {
421 C.MESS_EXTRA_INFO: C.EXTRA_INFO_ENCR_ERR
422 })
423 raise e
424
425 stanza.addChild(openpgp_elt)