comparison sat/plugins/plugin_sec_oxps.py @ 3934:e345d93fb6e5

plugin OXPS: OpenPGP for XMPP Pubsub implementation: OpenPGP for XMPP Pubsub (https://xmpp.org/extensions/inbox/pubsub-encryption.html, currently a protoXEP) is implemented and activated when `encrypted` is set to `True` in pubsub's `extra` data. On item retrieval, the decryption is transparent if the key is known, except if the `decrypt` key in `extra` is set to `False` (notably useful when one wants to checks that data is well encrypted). Methods and corresponding bridge methods have been implemented to manage shared secrets (to share, revoke or rotate the secrets). plugin XEP-0060's `XEP-0060_publish` trigger point as been move before actual publish so item can be modified (here e2ee) by the triggers. A new `XEP-0060_items` trigger point has also been added. `encrypted` flag can be used with plugin XEP-0277's microblog data rel 380
author Goffi <goffi@goffi.org>
date Sat, 15 Oct 2022 20:36:53 +0200
parents
children cd4d95b3fed3
comparison
equal deleted inserted replaced
3933:cecf45416403 3934:e345d93fb6e5
1 #!/usr/bin/env python3
2
3 # Libervia plugin for Pubsub Encryption
4 # Copyright (C) 2009-2022 Jérôme Poisson (goffi@goffi.org)
5
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU Affero General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
10
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU Affero General Public License for more details.
15
16 # You should have received a copy of the GNU Affero General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19 import base64
20 import dataclasses
21 import secrets
22 import time
23 from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, Union
24 from collections import OrderedDict
25
26 import shortuuid
27 from twisted.internet import defer
28 from twisted.words.protocols.jabber import jid, xmlstream
29 from twisted.words.xish import domish
30 from wokkel import disco, iwokkel
31 from wokkel import rsm
32 from zope.interface import implementer
33
34 from sat.core import exceptions
35 from sat.core.constants import Const as C
36 from sat.core.core_types import SatXMPPEntity
37 from sat.core.i18n import _
38 from sat.core.log import getLogger
39 from sat.memory import persistent
40 from sat.tools import utils
41 from sat.tools import xml_tools
42 from sat.tools.common import data_format
43 from sat.tools.common import uri
44 from sat.tools.common.async_utils import async_lru
45
46 from .plugin_xep_0373 import NS_OX, get_gpg_provider
47
48
49 log = getLogger(__name__)
50
51 IMPORT_NAME = "OXPS"
52
53 PLUGIN_INFO = {
54 C.PI_NAME: "OpenPGP for XMPP Pubsub",
55 C.PI_IMPORT_NAME: IMPORT_NAME,
56 C.PI_TYPE: C.PLUG_TYPE_XEP,
57 C.PI_MODES: C.PLUG_MODE_BOTH,
58 C.PI_PROTOCOLS: [],
59 C.PI_DEPENDENCIES: ["XEP-0060", "XEP-0334", "XEP-0373"],
60 C.PI_MAIN: "PubsubEncryption",
61 C.PI_HANDLER: "yes",
62 C.PI_DESCRIPTION: _("""Pubsub e2e encryption via OpenPGP"""),
63 }
64 NS_OXPS = "urn:xmpp:openpgp:pubsub:0"
65
66 KEY_REVOKED = "revoked"
67 CACHE_MAX = 5
68
69
70 @dataclasses.dataclass
71 class SharedSecret:
72 id: str
73 key: str
74 timestamp: float
75 # bare JID of who has generated the secret
76 origin: jid.JID
77 revoked: bool = False
78 shared_with: Set[jid.JID] = dataclasses.field(default_factory=set)
79
80
81 class PubsubEncryption:
82 namespace = NS_OXPS
83
84 def __init__(self, host):
85 log.info(_("OpenPGP for XMPP Pubsub plugin initialization"))
86 host.registerNamespace("oxps", NS_OXPS)
87 self.host = host
88 self._p = host.plugins["XEP-0060"]
89 self._h = host.plugins["XEP-0334"]
90 self._ox = host.plugins["XEP-0373"]
91 host.trigger.add("XEP-0060_publish", self._publish_trigger)
92 host.trigger.add("XEP-0060_items", self._items_trigger)
93 host.trigger.add(
94 "messageReceived",
95 self._message_received_trigger,
96 )
97 host.bridge.addMethod(
98 "psSecretShare",
99 ".plugin",
100 in_sign="sssass",
101 out_sign="",
102 method=self._ps_secret_share,
103 async_=True,
104 )
105 host.bridge.addMethod(
106 "psSecretRevoke",
107 ".plugin",
108 in_sign="sssass",
109 out_sign="",
110 method=self._ps_secret_revoke,
111 async_=True,
112 )
113 host.bridge.addMethod(
114 "psSecretRotate",
115 ".plugin",
116 in_sign="ssass",
117 out_sign="",
118 method=self._ps_secret_rotate,
119 async_=True,
120 )
121 host.bridge.addMethod(
122 "psSecretsList",
123 ".plugin",
124 in_sign="sss",
125 out_sign="s",
126 method=self._ps_secrets_list,
127 async_=True,
128 )
129
130 def getHandler(self, client):
131 return PubsubEncryption_Handler()
132
133 async def profileConnecting(self, client):
134 client.__storage = persistent.LazyPersistentBinaryDict(
135 IMPORT_NAME, client.profile
136 )
137 # cache to avoid useless DB access, and to avoid race condition by ensuring that
138 # the same shared_secrets instance is always used for a given node.
139 client.__cache = OrderedDict()
140 self.gpg_provider = get_gpg_provider(self.host, client)
141
142 async def load_secrets(
143 self,
144 client: SatXMPPEntity,
145 node_uri: str
146 ) -> Optional[Dict[str, SharedSecret]]:
147 """Load shared secret from databse or cache
148
149 A cache is used per client to avoid usueless db access, as shared secrets are
150 often needed several times in a row. Cache is also necessary to avoir race
151 condition, when updating a secret, by ensuring that the same instance is used
152 for all updates during a session.
153
154 @param node_uri: XMPP URI of the encrypted pubsub node
155 @return shared secrets, or None if no secrets are known yet
156 """
157 try:
158 shared_secrets = client.__cache[node_uri]
159 except KeyError:
160 pass
161 else:
162 client.__cache.move_to_end(node_uri)
163 return shared_secrets
164
165 secrets_as_dict = await client.__storage.get(node_uri)
166
167 if secrets_as_dict is None:
168 return None
169 else:
170 shared_secrets = {
171 s["id"]: SharedSecret(
172 id=s["id"],
173 key=s["key"],
174 timestamp=s["timestamp"],
175 origin=jid.JID(s["origin"]),
176 revoked=s["revoked"],
177 shared_with={jid.JID(w) for w in s["shared_with"]}
178 ) for s in secrets_as_dict
179 }
180 client.__cache[node_uri] = shared_secrets
181 while len(client.__cache) > CACHE_MAX:
182 client.__cache.popitem(False)
183 return shared_secrets
184
185 def __secrect_dict_factory(self, data: List[Tuple[str, Any]]) -> Dict[str, Any]:
186 ret = {}
187 for k, v in data:
188 if k == "origin":
189 v = v.full()
190 elif k == "shared_with":
191 v = [j.full() for j in v]
192 ret[k] = v
193 return ret
194
195 async def store_secrets(
196 self,
197 client: SatXMPPEntity,
198 node_uri: str,
199 shared_secrets: Dict[str, SharedSecret]
200 ) -> None:
201 """Store shared secrets to database
202
203 Shared secrets are serialised before being stored.
204 If ``node_uri`` is not in cache, the shared_secrets instance is also put in cache/
205
206 @param node_uri: XMPP URI of the encrypted pubsub node
207 @param shared_secrets: shared secrets to store
208 """
209 if node_uri not in client.__cache:
210 client.__cache[node_uri] = shared_secrets
211 while len(client.__cache) > CACHE_MAX:
212 client.__cache.popitem(False)
213
214 secrets_as_dict = [
215 dataclasses.asdict(s, dict_factory=self.__secrect_dict_factory)
216 for s in shared_secrets.values()
217 ]
218 await client.__storage.aset(node_uri, secrets_as_dict)
219
220 def generate_secret(self, client: SatXMPPEntity) -> SharedSecret:
221 """Generate a new shared secret"""
222 log.info("Generating a new shared secret.")
223 secret_key = secrets.token_urlsafe(64)
224 secret_id = shortuuid.uuid()
225 return SharedSecret(
226 id = secret_id,
227 key = secret_key,
228 timestamp = time.time(),
229 origin = client.jid.userhostJID()
230 )
231
232 def _ps_secret_revoke(
233 self,
234 service: str,
235 node: str,
236 secret_id: str,
237 recipients: List[str],
238 profile_key: str
239 ) -> defer.Deferred:
240 return defer.ensureDeferred(
241 self.revoke(
242 self.host.getClient(profile_key),
243 jid.JID(service) if service else None,
244 node,
245 secret_id,
246 [jid.JID(r) for r in recipients] or None,
247 )
248 )
249
250 async def revoke(
251 self,
252 client: SatXMPPEntity,
253 service: Optional[jid.JID],
254 node: str,
255 secret_id: str,
256 recipients: Optional[Iterable[jid.JID]] = None
257 ) -> None:
258 """Revoke a secret and notify entities
259
260 @param service: pubsub/PEP service where the node is
261 @param node: node name
262 @param secret_id: ID of the secret to revoke (must have been generated by
263 ourselves)
264 recipients: JIDs of entities to send the revocation notice to. If None, all
265 entities known to have the shared secret will be notified.
266 Use empty list if you don't want to notify anybody (not recommended)
267 """
268 if service is None:
269 service = client.jid.userhostJID()
270 node_uri = uri.buildXMPPUri("pubsub", path=service.full(), node=node)
271 shared_secrets = await self.load_secrets(client, node_uri)
272 if not shared_secrets:
273 raise exceptions.NotFound(f"No shared secret is known for {node_uri}")
274 try:
275 shared_secret = shared_secrets[secret_id]
276 except KeyError:
277 raise exceptions.NotFound(
278 f"No shared secret with ID {secret_id!r} has been found for {node_uri}"
279 )
280 else:
281 if shared_secret.origin != client.jid.userhostJID():
282 raise exceptions.PermissionError(
283 f"The shared secret {shared_secret.id} originate from "
284 f"{shared_secret.origin}, not you ({client.jid.userhostJID()}). You "
285 "can't revoke it"
286 )
287 shared_secret.revoked = True
288 await self.store_secrets(client, node_uri, shared_secrets)
289 log.info(
290 f"shared secret {secret_id!r} for {node_uri} has been revoked."
291 )
292 if recipients is None:
293 recipients = shared_secret.shared_with
294 if recipients:
295 for recipient in recipients:
296 await self.send_revoke_notification(
297 client, service, node, shared_secret.id, recipient
298 )
299 log.info(
300 f"shared secret {shared_secret.id} revocation notification for "
301 f"{node_uri} has been send to {''.join(str(r) for r in recipients)}"
302 )
303 else:
304 log.info(
305 "Due to empty recipients list, no revocation notification has been sent "
306 f"for shared secret {shared_secret.id} for {node_uri}"
307 )
308
309 async def send_revoke_notification(
310 self,
311 client: SatXMPPEntity,
312 service: jid.JID,
313 node: str,
314 secret_id: str,
315 recipient: jid.JID
316 ) -> None:
317 revoke_elt = domish.Element((NS_OXPS, "revoke"))
318 revoke_elt["jid"] = service.full()
319 revoke_elt["node"] = node
320 revoke_elt["id"] = secret_id
321 signcrypt_elt, payload_elt = self._ox.build_signcrypt_element([recipient])
322 payload_elt.addChild(revoke_elt)
323 openpgp_elt = await self._ox.build_openpgp_element(
324 client, signcrypt_elt, {recipient}
325 )
326 message_elt = domish.Element((None, "message"))
327 message_elt["from"] = client.jid.full()
328 message_elt["to"] = recipient.full()
329 message_elt.addChild((openpgp_elt))
330 self._h.addHintElements(message_elt, [self._h.HINT_STORE])
331 client.send(message_elt)
332
333 def _ps_secret_share(
334 self,
335 recipient: str,
336 service: str,
337 node: str,
338 secret_ids: List[str],
339 profile_key: str
340 ) -> defer.Deferred:
341 return defer.ensureDeferred(
342 self.share_secrets(
343 self.host.getClient(profile_key),
344 jid.JID(recipient),
345 jid.JID(service) if service else None,
346 node,
347 secret_ids or None,
348 )
349 )
350
351 async def share_secret(
352 self,
353 client: SatXMPPEntity,
354 service: Optional[jid.JID],
355 node: str,
356 shared_secret: SharedSecret,
357 recipient: jid.JID
358 ) -> None:
359 """Create and send <shared-secret> element"""
360 if service is None:
361 service = client.jid.userhostJID()
362 shared_secret_elt = domish.Element((NS_OXPS, "shared-secret"))
363 shared_secret_elt["jid"] = service.full()
364 shared_secret_elt["node"] = node
365 shared_secret_elt["id"] = shared_secret.id
366 shared_secret_elt["timestamp"] = utils.xmpp_date(shared_secret.timestamp)
367 if shared_secret.revoked:
368 shared_secret_elt["revoked"] = C.BOOL_TRUE
369 # TODO: add type attribute
370 shared_secret_elt.addContent(shared_secret.key)
371 signcrypt_elt, payload_elt = self._ox.build_signcrypt_element([recipient])
372 payload_elt.addChild(shared_secret_elt)
373 openpgp_elt = await self._ox.build_openpgp_element(
374 client, signcrypt_elt, {recipient}
375 )
376 message_elt = domish.Element((None, "message"))
377 message_elt["from"] = client.jid.full()
378 message_elt["to"] = recipient.full()
379 message_elt.addChild((openpgp_elt))
380 self._h.addHintElements(message_elt, [self._h.HINT_STORE])
381 client.send(message_elt)
382 shared_secret.shared_with.add(recipient)
383
384 async def share_secrets(
385 self,
386 client: SatXMPPEntity,
387 recipient: jid.JID,
388 service: Optional[jid.JID],
389 node: str,
390 secret_ids: Optional[List[str]] = None,
391 ) -> None:
392 """Share secrets of a pubsub node with a recipient
393
394 @param recipient: who to share secrets with
395 @param service: pubsub/PEP service where the node is
396 @param node: node name
397 @param secret_ids: IDs of the secrets to share, or None to share all known secrets
398 (disabled or not)
399 """
400 if service is None:
401 service = client.jid.userhostJID()
402 node_uri = uri.buildXMPPUri("pubsub", path=service.full(), node=node)
403 shared_secrets = await self.load_secrets(client, node_uri)
404 if shared_secrets is None:
405 # no secret shared yet, let's generate one
406 shared_secret = self.generate_secret(client)
407 shared_secrets = {shared_secret.id: shared_secret}
408 await self.store_secrets(client, node_uri, shared_secrets)
409 if secret_ids is None:
410 # we share all secrets of the node
411 to_share = shared_secrets.values()
412 else:
413 try:
414 to_share = [shared_secrets[s_id] for s_id in secret_ids]
415 except KeyError as e:
416 raise exceptions.NotFound(
417 f"no shared secret found with given ID: {e}"
418 )
419 for shared_secret in to_share:
420 await self.share_secret(client, service, node, shared_secret, recipient)
421 await self.store_secrets(client, node_uri, shared_secrets)
422
423 def _ps_secret_rotate(
424 self,
425 service: str,
426 node: str,
427 recipients: List[str],
428 profile_key: str,
429 ) -> defer.Deferred:
430 return defer.ensureDeferred(
431 self.rotate_secret(
432 self.host.getClient(profile_key),
433 jid.JID(service) if service else None,
434 node,
435 [jid.JID(r) for r in recipients] or None
436 )
437 )
438
439 async def rotate_secret(
440 self,
441 client: SatXMPPEntity,
442 service: Optional[jid.JID],
443 node: str,
444 recipients: Optional[List[jid.JID]] = None
445 ) -> None:
446 """Revoke all current known secrets, create and share a new one
447
448 @param service: pubsub/PEP service where the node is
449 @param node: node name
450 @param recipients: who must receive the new shared secret
451 if None, all recipients known to have last active shared secret will get the
452 new secret
453 """
454 if service is None:
455 service = client.jid.userhostJID()
456 node_uri = uri.buildXMPPUri("pubsub", path=service.full(), node=node)
457 shared_secrets = await self.load_secrets(client, node_uri)
458 if shared_secrets is None:
459 shared_secrets = {}
460 for shared_secret in shared_secrets.values():
461 if not shared_secret.revoked:
462 await self.revoke(client, service, node, shared_secret.id)
463 shared_secret.revoked = True
464
465 if recipients is None:
466 if shared_secrets:
467 # we get recipients from latests shared secret's shared_with list,
468 # regarless of deprecation (cause all keys may be deprecated)
469 recipients = list(sorted(
470 shared_secrets.values(),
471 key=lambda s: s.timestamp,
472 reverse=True
473 )[0].shared_with)
474 else:
475 recipients = []
476
477 shared_secret = self.generate_secret(client)
478 shared_secrets[shared_secret.id] = shared_secret
479 # we send notification to last entities known to already have the shared secret
480 for recipient in recipients:
481 await self.share_secret(client, service, node, shared_secret, recipient)
482 await self.store_secrets(client, node_uri, shared_secrets)
483
484 def _ps_secrets_list(
485 self,
486 service: str,
487 node: str,
488 profile_key: str
489 ) -> defer.Deferred:
490 d = defer.ensureDeferred(
491 self.list_shared_secrets(
492 self.host.getClient(profile_key),
493 jid.JID(service) if service else None,
494 node,
495 )
496 )
497 d.addCallback(lambda ret: data_format.serialise(ret))
498 return d
499
500 async def list_shared_secrets(
501 self,
502 client: SatXMPPEntity,
503 service: Optional[jid.JID],
504 node: str,
505 ) -> List[Dict[str, Any]]:
506 """Retrieve for shared secrets of a pubsub node
507
508 @param service: pubsub/PEP service where the node is
509 @param node: node name
510 @return: shared secrets data
511 @raise exceptions.NotFound: no shared secret found for this node
512 """
513 if service is None:
514 service = client.jid.userhostJID()
515 node_uri = uri.buildXMPPUri("pubsub", path=service.full(), node=node)
516 shared_secrets = await self.load_secrets(client, node_uri)
517 if shared_secrets is None:
518 raise exceptions.NotFound(f"No shared secrets found for {node_uri}")
519 return [
520 dataclasses.asdict(s, dict_factory=self.__secrect_dict_factory)
521 for s in shared_secrets.values()
522 ]
523
524 async def handle_revoke_elt(
525 self,
526 client: SatXMPPEntity,
527 sender: jid.JID,
528 revoke_elt: domish.Element
529 ) -> None:
530 """Parse a <revoke> element and update local secrets
531
532 @param sender: bare jid of the entity who has signed the secret
533 @param revoke: <revoke/> element
534 """
535 try:
536 service = jid.JID(revoke_elt["jid"])
537 node = revoke_elt["node"]
538 secret_id = revoke_elt["id"]
539 except (KeyError, RuntimeError) as e:
540 log.warning(
541 f"ignoring invalid <revoke> element: {e}\n{revoke_elt.toXml()}"
542 )
543 return
544 node_uri = uri.buildXMPPUri("pubsub", path=service.full(), node=node)
545 shared_secrets = await self.load_secrets(client, node_uri)
546 if shared_secrets is None:
547 log.warning(
548 f"Can't revoke shared secret {secret_id}: no known shared secrets for "
549 f"{node_uri}"
550 )
551 return
552
553 if any(s.origin != sender for s in shared_secrets.values()):
554 log.warning(
555 f"Rejecting shared secret revocation signed by invalid entity ({sender}):"
556 f"\n{revoke_elt.toXml}"
557 )
558 return
559
560 try:
561 shared_secret = shared_secrets[secret_id]
562 except KeyError:
563 log.warning(
564 f"Can't revoke shared secret {secret_id}: this secret ID is unknown for "
565 f"{node_uri}"
566 )
567 return
568
569 shared_secret.revoked = True
570 await self.store_secrets(client, node_uri, shared_secrets)
571 log.info(f"Shared secret {secret_id} has been revoked for {node_uri}")
572
573 async def handle_shared_secret_elt(
574 self,
575 client: SatXMPPEntity,
576 sender: jid.JID,
577 shared_secret_elt: domish.Element
578 ) -> None:
579 """Parse a <shared-secret> element and update local secrets
580
581 @param sender: bare jid of the entity who has signed the secret
582 @param shared_secret_elt: <shared-secret/> element
583 """
584 try:
585 service = jid.JID(shared_secret_elt["jid"])
586 node = shared_secret_elt["node"]
587 secret_id = shared_secret_elt["id"]
588 timestamp = utils.parse_xmpp_date(shared_secret_elt["timestamp"])
589 # TODO: handle "type" attribute
590 revoked = C.bool(shared_secret_elt.getAttribute("revoked", C.BOOL_FALSE))
591 except (KeyError, RuntimeError, ValueError) as e:
592 log.warning(
593 f"ignoring invalid <shared-secret> element: "
594 f"{e}\n{shared_secret_elt.toXml()}"
595 )
596 return
597 key = str(shared_secret_elt)
598 if not key:
599 log.warning(
600 "ignoring <shared-secret> element with empty key: "
601 f"{shared_secret_elt.toXml()}"
602 )
603 return
604 shared_secret = SharedSecret(
605 id=secret_id, key=key, timestamp=timestamp, origin=sender, revoked=revoked
606 )
607 node_uri = uri.buildXMPPUri("pubsub", path=service.full(), node=node)
608 shared_secrets = await self.load_secrets(client, node_uri)
609 if shared_secrets is None:
610 shared_secrets = {}
611 # no known shared secret yet for this node, we have to trust first user who
612 # send it
613 else:
614 if any(s.origin != sender for s in shared_secrets.values()):
615 log.warning(
616 f"Rejecting shared secret signed by invalid entity ({sender}):\n"
617 f"{shared_secret_elt.toXml}"
618 )
619 return
620
621 shared_secrets[shared_secret.id] = shared_secret
622 await self.store_secrets(client, node_uri, shared_secrets)
623 log.info(
624 f"shared secret {shared_secret.id} added for {node_uri} [{client.profile}]"
625 )
626
627 async def _publish_trigger(
628 self,
629 client: SatXMPPEntity,
630 service: jid.JID,
631 node: str,
632 items: Optional[List[domish.Element]],
633 options: Optional[dict],
634 sender: jid.JID,
635 extra: Dict[str, Any]
636 ) -> bool:
637 if not items or not extra.get("encrypted"):
638 return True
639 node_uri = uri.buildXMPPUri("pubsub", path=service.full(), node=node)
640 shared_secrets = await self.load_secrets(client, node_uri)
641 if shared_secrets is None:
642 shared_secrets = {}
643 shared_secret = None
644 else:
645 current_secrets = [s for s in shared_secrets.values() if not s.revoked]
646 if not current_secrets:
647 shared_secret = None
648 elif len(current_secrets) > 1:
649 log.warning(
650 f"more than one active shared secret found for node {node!r} at "
651 f"{service}, using the most recent one"
652 )
653 current_secrets.sort(key=lambda s: s.timestamp, reverse=True)
654 shared_secret = current_secrets[0]
655 else:
656 shared_secret = current_secrets[0]
657
658 if shared_secret is None:
659 if any(s.origin != client.jid.userhostJID() for s in shared_secrets.values()):
660 raise exceptions.PermissionError(
661 "there is no known active shared secret, and you are not the "
662 "creator of previous shared secrets, we can't encrypt items at "
663 f"{node_uri} ."
664 )
665 shared_secret = self.generate_secret(client)
666 shared_secrets[shared_secret.id] = shared_secret
667 await self.store_secrets(client, node_uri, shared_secrets)
668 # TODO: notify other entities
669
670 for item in items:
671 item_elts = list(item.elements())
672 if len(item_elts) != 1:
673 raise ValueError(
674 f"there should be exactly one item payload: {item.toXml()}"
675 )
676 item_payload = item_elts[0]
677 log.debug(f"encrypting item {item.getAttribute('id', '')}")
678 encrypted_item = self.gpg_provider.encrypt_symmetrically(
679 item_payload.toXml().encode(), shared_secret.key
680 )
681 item.children.clear()
682 encrypted_elt = domish.Element((NS_OXPS, "encrypted"))
683 encrypted_elt["key"] = shared_secret.id
684 encrypted_elt.addContent(base64.b64encode(encrypted_item).decode())
685 item.addChild(encrypted_elt)
686
687 return True
688
689 async def _items_trigger(
690 self,
691 client: SatXMPPEntity,
692 service: Optional[jid.JID],
693 node: str,
694 items: List[domish.Element],
695 rsm_response: rsm.RSMResponse,
696 extra: Dict[str, Any],
697 ) -> None:
698 if not extra.get(C.KEY_DECRYPT, True):
699 return
700 if service is None:
701 service = client.jid.userhostJID()
702 shared_secrets = None
703 for item in items:
704 payload = item.firstChildElement()
705 if (payload is not None
706 and payload.name == "encrypted"
707 and payload.uri == NS_OXPS):
708 encrypted_elt = payload
709 secret_id = encrypted_elt.getAttribute("key")
710 if not secret_id:
711 log.warning(
712 f'"key" attribute is missing from encrypted item: {item.toXml()}'
713 )
714 continue
715 if shared_secrets is None:
716 node_uri = uri.buildXMPPUri("pubsub", path=service.full(), node=node)
717 shared_secrets = await self.load_secrets(client, node_uri)
718 if shared_secrets is None:
719 log.warning(
720 f"No known shared secret for {node_uri}, can't decrypt"
721 )
722 return
723 try:
724 shared_secret = shared_secrets[secret_id]
725 except KeyError:
726 log.warning(
727 f"No key known for encrypted item {item['id']!r} (shared secret "
728 f"id: {secret_id!r})"
729 )
730 continue
731 log.debug(f"decrypting item {item.getAttribute('id', '')}")
732 decrypted = self.gpg_provider.decrypt_symmetrically(
733 base64.b64decode(str(encrypted_elt)),
734 shared_secret.key
735 )
736 decrypted_elt = xml_tools.parse(decrypted)
737 item.children.clear()
738 item.addChild(decrypted_elt)
739
740 async def _message_received_trigger(
741 self,
742 client: SatXMPPEntity,
743 message_elt: domish.Element,
744 post_treat: defer.Deferred
745 ) -> bool:
746 sender = jid.JID(message_elt["from"]).userhostJID()
747 # there may be an openpgp element if OXIM is not activate, in this case we have to
748 # decrypt it here
749 openpgp_elt = next(message_elt.elements(NS_OX, "openpgp"), None)
750 if openpgp_elt is not None:
751 try:
752 payload_elt, __ = await self._ox.unpack_openpgp_element(
753 client,
754 openpgp_elt,
755 "signcrypt",
756 sender
757 )
758 except Exception as e:
759 log.warning(f"Can't decrypt element: {e}\n{message_elt.toXml()}")
760 return False
761 message_elt.children.remove(openpgp_elt)
762 for c in payload_elt.children:
763 message_elt.addChild(c)
764
765 shared_secret_elt = next(message_elt.elements(NS_OXPS, "shared-secret"), None)
766 if shared_secret_elt is None:
767 # no <shared-secret>, we check if there is a <revoke> element
768 revoke_elt = next(message_elt.elements(NS_OXPS, "revoke"), None)
769 if revoke_elt is None:
770 return True
771 else:
772 await self.handle_revoke_elt(client, sender, revoke_elt)
773 else:
774 await self.handle_shared_secret_elt(client, sender, shared_secret_elt)
775
776 return False
777
778
779 @implementer(iwokkel.IDisco)
780 class PubsubEncryption_Handler(xmlstream.XMPPHandler):
781
782 def getDiscoInfo(self, requestor, service, nodeIdentifier=""):
783 return [disco.DiscoFeature(NS_OXPS)]
784
785 def getDiscoItems(self, requestor, service, nodeIdentifier=""):
786 return []