comparison sat/plugins/plugin_sec_pte.py @ 3972:5fbdf986670c

plugin pte: Pubsub Target Encryption implementation: This plugin lets encrypt a few items for a specific set of entities. rel 382
author Goffi <goffi@goffi.org>
date Mon, 31 Oct 2022 13:46:51 +0100
parents
children 524856bd7b19
comparison
equal deleted inserted replaced
3971:9b1d74a6b48c 3972:5fbdf986670c
1 #!/usr/bin/env python3
2
3 # Libervia plugin for Pubsub Targeted Encryption
4 # Copyright (C) 2009-2022 Jérôme Poisson (goffi@goffi.org)
5
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU Affero General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
10
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU Affero General Public License for more details.
15
16 # You should have received a copy of the GNU Affero General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19 from typing import Any, Dict, List, Optional
20
21 from twisted.internet import defer
22 from twisted.words.protocols.jabber import jid, xmlstream
23 from twisted.words.xish import domish
24 from wokkel import disco, iwokkel
25 from wokkel import rsm
26 from zope.interface import implementer
27
28 from sat.core import exceptions
29 from sat.core.constants import Const as C
30 from sat.core.core_types import SatXMPPEntity
31 from sat.core.i18n import _
32 from sat.core.log import getLogger
33
34
35 log = getLogger(__name__)
36
37 IMPORT_NAME = "PTE"
38
39 PLUGIN_INFO = {
40 C.PI_NAME: "Pubsub Targeted Encryption",
41 C.PI_IMPORT_NAME: IMPORT_NAME,
42 C.PI_TYPE: C.PLUG_TYPE_XEP,
43 C.PI_MODES: C.PLUG_MODE_BOTH,
44 C.PI_PROTOCOLS: [],
45 C.PI_DEPENDENCIES: ["XEP-0060", "XEP-0384"],
46 C.PI_MAIN: "PTE",
47 C.PI_HANDLER: "yes",
48 C.PI_DESCRIPTION: _("""Encrypt some items to specific entities"""),
49 }
50 NS_PTE = "urn:xmpp:pte:0"
51
52
53 class PTE:
54 namespace = NS_PTE
55
56 def __init__(self, host):
57 log.info(_("Pubsub Targeted Encryption plugin initialization"))
58 host.registerNamespace("pte", NS_PTE)
59 self.host = host
60 self._o = host.plugins["XEP-0384"]
61 host.trigger.add("XEP-0060_publish", self._publish_trigger)
62 host.trigger.add("XEP-0060_items", self._items_trigger)
63
64 def getHandler(self, client):
65 return PTE_Handler()
66
67 async def _publish_trigger(
68 self,
69 client: SatXMPPEntity,
70 service: jid.JID,
71 node: str,
72 items: Optional[List[domish.Element]],
73 options: Optional[dict],
74 sender: jid.JID,
75 extra: Dict[str, Any]
76 ) -> bool:
77 if not items or extra.get("encrypted_for") is None:
78 return True
79 encrypt_data = extra["encrypted_for"]
80 try:
81 targets = {jid.JID(t) for t in encrypt_data["targets"]}
82 except (KeyError, RuntimeError):
83 raise exceptions.DataError(f"Invalid encryption data: {encrypt_data}")
84 for item in items:
85 log.debug(
86 f"encrypting item {item.getAttribute('id', '')} for "
87 f"{', '.join(t.full() for t in targets)}"
88 )
89 encryption_type = encrypt_data.get("type", self._o.NS_TWOMEMO)
90 if encryption_type != self._o.NS_TWOMEMO:
91 raise NotImplementedError("only TWOMEMO is supported for now")
92 await self._o.encrypt(
93 client,
94 self._o.NS_TWOMEMO,
95 item,
96 targets,
97 is_muc_message=False,
98 stanza_id=None
99 )
100 item_elts = list(item.elements())
101 if len(item_elts) != 1:
102 raise ValueError(
103 f"there should be exactly one item payload: {item.toXml()}"
104 )
105 encrypted_payload = item_elts[0]
106 item.children.clear()
107 encrypted_elt = item.addElement((NS_PTE, "encrypted"))
108 encrypted_elt["by"] = sender.userhost()
109 encrypted_elt["type"] = encryption_type
110 encrypted_elt.addChild(encrypted_payload)
111
112 return True
113
114 async def _items_trigger(
115 self,
116 client: SatXMPPEntity,
117 service: Optional[jid.JID],
118 node: str,
119 items: List[domish.Element],
120 rsm_response: rsm.RSMResponse,
121 extra: Dict[str, Any],
122 ) -> bool:
123 if not extra.get(C.KEY_DECRYPT, True):
124 return True
125 if service is None:
126 service = client.jid.userhostJID()
127 for item in items:
128 payload = item.firstChildElement()
129 if (payload is not None
130 and payload.name == "encrypted"
131 and payload.uri == NS_PTE):
132 encrypted_elt = payload
133 item.children.clear()
134 try:
135 encryption_type = encrypted_elt.getAttribute("type")
136 encrypted_by = jid.JID(encrypted_elt["by"])
137 except (KeyError, RuntimeError):
138 raise exceptions.DataError(
139 f"invalid <encrypted> element: {encrypted_elt.toXml()}"
140 )
141 if encryption_type!= self._o.NS_TWOMEMO:
142 raise NotImplementedError("only TWOMEMO is supported for now")
143 log.debug(f"decrypting item {item.getAttribute('id', '')}")
144
145 # FIXME: we do use _message_received_trigger now to decrypt the stanza, a
146 # cleaner separated decrypt method should be used
147 encrypted_elt["from"] = encrypted_by.full()
148 if not await self._o._message_received_trigger(
149 client,
150 encrypted_elt,
151 defer.Deferred()
152 ) or not encrypted_elt.children:
153 raise exceptions.EncryptionError("can't decrypt the message")
154
155 item.addChild(encrypted_elt.firstChildElement())
156
157 extra.setdefault("encrypted", {})[item["id"]] = {
158 "type": NS_PTE,
159 "algorithm": encryption_type
160 }
161 return True
162
163
164 @implementer(iwokkel.IDisco)
165 class PTE_Handler(xmlstream.XMPPHandler):
166
167 def getDiscoInfo(self, requestor, service, nodeIdentifier=""):
168 return [disco.DiscoFeature(NS_PTE)]
169
170 def getDiscoItems(self, requestor, service, nodeIdentifier=""):
171 return []