Mercurial > libervia-backend
comparison sat/plugins/plugin_sec_pte.py @ 3972:5fbdf986670c
plugin pte: Pubsub Target Encryption implementation:
This plugin lets encrypt a few items for a specific set of entities.
rel 382
author | Goffi <goffi@goffi.org> |
---|---|
date | Mon, 31 Oct 2022 13:46:51 +0100 |
parents | |
children | 524856bd7b19 |
comparison
equal
deleted
inserted
replaced
3971:9b1d74a6b48c | 3972:5fbdf986670c |
---|---|
1 #!/usr/bin/env python3 | |
2 | |
3 # Libervia plugin for Pubsub Targeted Encryption | |
4 # Copyright (C) 2009-2022 Jérôme Poisson (goffi@goffi.org) | |
5 | |
6 # This program is free software: you can redistribute it and/or modify | |
7 # it under the terms of the GNU Affero General Public License as published by | |
8 # the Free Software Foundation, either version 3 of the License, or | |
9 # (at your option) any later version. | |
10 | |
11 # This program is distributed in the hope that it will be useful, | |
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
14 # GNU Affero General Public License for more details. | |
15 | |
16 # You should have received a copy of the GNU Affero General Public License | |
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
18 | |
19 from typing import Any, Dict, List, Optional | |
20 | |
21 from twisted.internet import defer | |
22 from twisted.words.protocols.jabber import jid, xmlstream | |
23 from twisted.words.xish import domish | |
24 from wokkel import disco, iwokkel | |
25 from wokkel import rsm | |
26 from zope.interface import implementer | |
27 | |
28 from sat.core import exceptions | |
29 from sat.core.constants import Const as C | |
30 from sat.core.core_types import SatXMPPEntity | |
31 from sat.core.i18n import _ | |
32 from sat.core.log import getLogger | |
33 | |
34 | |
35 log = getLogger(__name__) | |
36 | |
37 IMPORT_NAME = "PTE" | |
38 | |
39 PLUGIN_INFO = { | |
40 C.PI_NAME: "Pubsub Targeted Encryption", | |
41 C.PI_IMPORT_NAME: IMPORT_NAME, | |
42 C.PI_TYPE: C.PLUG_TYPE_XEP, | |
43 C.PI_MODES: C.PLUG_MODE_BOTH, | |
44 C.PI_PROTOCOLS: [], | |
45 C.PI_DEPENDENCIES: ["XEP-0060", "XEP-0384"], | |
46 C.PI_MAIN: "PTE", | |
47 C.PI_HANDLER: "yes", | |
48 C.PI_DESCRIPTION: _("""Encrypt some items to specific entities"""), | |
49 } | |
50 NS_PTE = "urn:xmpp:pte:0" | |
51 | |
52 | |
53 class PTE: | |
54 namespace = NS_PTE | |
55 | |
56 def __init__(self, host): | |
57 log.info(_("Pubsub Targeted Encryption plugin initialization")) | |
58 host.registerNamespace("pte", NS_PTE) | |
59 self.host = host | |
60 self._o = host.plugins["XEP-0384"] | |
61 host.trigger.add("XEP-0060_publish", self._publish_trigger) | |
62 host.trigger.add("XEP-0060_items", self._items_trigger) | |
63 | |
64 def getHandler(self, client): | |
65 return PTE_Handler() | |
66 | |
67 async def _publish_trigger( | |
68 self, | |
69 client: SatXMPPEntity, | |
70 service: jid.JID, | |
71 node: str, | |
72 items: Optional[List[domish.Element]], | |
73 options: Optional[dict], | |
74 sender: jid.JID, | |
75 extra: Dict[str, Any] | |
76 ) -> bool: | |
77 if not items or extra.get("encrypted_for") is None: | |
78 return True | |
79 encrypt_data = extra["encrypted_for"] | |
80 try: | |
81 targets = {jid.JID(t) for t in encrypt_data["targets"]} | |
82 except (KeyError, RuntimeError): | |
83 raise exceptions.DataError(f"Invalid encryption data: {encrypt_data}") | |
84 for item in items: | |
85 log.debug( | |
86 f"encrypting item {item.getAttribute('id', '')} for " | |
87 f"{', '.join(t.full() for t in targets)}" | |
88 ) | |
89 encryption_type = encrypt_data.get("type", self._o.NS_TWOMEMO) | |
90 if encryption_type != self._o.NS_TWOMEMO: | |
91 raise NotImplementedError("only TWOMEMO is supported for now") | |
92 await self._o.encrypt( | |
93 client, | |
94 self._o.NS_TWOMEMO, | |
95 item, | |
96 targets, | |
97 is_muc_message=False, | |
98 stanza_id=None | |
99 ) | |
100 item_elts = list(item.elements()) | |
101 if len(item_elts) != 1: | |
102 raise ValueError( | |
103 f"there should be exactly one item payload: {item.toXml()}" | |
104 ) | |
105 encrypted_payload = item_elts[0] | |
106 item.children.clear() | |
107 encrypted_elt = item.addElement((NS_PTE, "encrypted")) | |
108 encrypted_elt["by"] = sender.userhost() | |
109 encrypted_elt["type"] = encryption_type | |
110 encrypted_elt.addChild(encrypted_payload) | |
111 | |
112 return True | |
113 | |
114 async def _items_trigger( | |
115 self, | |
116 client: SatXMPPEntity, | |
117 service: Optional[jid.JID], | |
118 node: str, | |
119 items: List[domish.Element], | |
120 rsm_response: rsm.RSMResponse, | |
121 extra: Dict[str, Any], | |
122 ) -> bool: | |
123 if not extra.get(C.KEY_DECRYPT, True): | |
124 return True | |
125 if service is None: | |
126 service = client.jid.userhostJID() | |
127 for item in items: | |
128 payload = item.firstChildElement() | |
129 if (payload is not None | |
130 and payload.name == "encrypted" | |
131 and payload.uri == NS_PTE): | |
132 encrypted_elt = payload | |
133 item.children.clear() | |
134 try: | |
135 encryption_type = encrypted_elt.getAttribute("type") | |
136 encrypted_by = jid.JID(encrypted_elt["by"]) | |
137 except (KeyError, RuntimeError): | |
138 raise exceptions.DataError( | |
139 f"invalid <encrypted> element: {encrypted_elt.toXml()}" | |
140 ) | |
141 if encryption_type!= self._o.NS_TWOMEMO: | |
142 raise NotImplementedError("only TWOMEMO is supported for now") | |
143 log.debug(f"decrypting item {item.getAttribute('id', '')}") | |
144 | |
145 # FIXME: we do use _message_received_trigger now to decrypt the stanza, a | |
146 # cleaner separated decrypt method should be used | |
147 encrypted_elt["from"] = encrypted_by.full() | |
148 if not await self._o._message_received_trigger( | |
149 client, | |
150 encrypted_elt, | |
151 defer.Deferred() | |
152 ) or not encrypted_elt.children: | |
153 raise exceptions.EncryptionError("can't decrypt the message") | |
154 | |
155 item.addChild(encrypted_elt.firstChildElement()) | |
156 | |
157 extra.setdefault("encrypted", {})[item["id"]] = { | |
158 "type": NS_PTE, | |
159 "algorithm": encryption_type | |
160 } | |
161 return True | |
162 | |
163 | |
164 @implementer(iwokkel.IDisco) | |
165 class PTE_Handler(xmlstream.XMPPHandler): | |
166 | |
167 def getDiscoInfo(self, requestor, service, nodeIdentifier=""): | |
168 return [disco.DiscoFeature(NS_PTE)] | |
169 | |
170 def getDiscoItems(self, requestor, service, nodeIdentifier=""): | |
171 return [] |