Mercurial > libervia-backend
comparison libervia/backend/plugins/plugin_sec_autocrypt.py @ 4351:6a0a081485b8
plugin autocrypt: Autocrypt protocol implementation:
Implementation of autocrypt: `autocrypt` header is checked, and if present and no public
key is known for the peer, the key is imported.
`autocrypt` header is also added to outgoing message (only if an email gateway is
detected).
For the moment, the JID is use as identifier, but the real email used by gateway should be
used in the future.
rel 456
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 28 Feb 2025 09:23:35 +0100 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
4350:6baea959dc33 | 4351:6a0a081485b8 |
---|---|
1 #!/usr/bin/env python3 | |
2 | |
3 # Libervia plugin | |
4 # Copyright (C) 2009-2025 Jérôme Poisson (goffi@goffi.org) | |
5 | |
6 # This program is free software: you can redistribute it and/or modify | |
7 # it under the terms of the GNU Affero General Public License as published by | |
8 # the Free Software Foundation, either version 3 of the License, or | |
9 # (at your option) any later version. | |
10 | |
11 # This program is distributed in the hope that it will be useful, | |
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
14 # GNU Affero General Public License for more details. | |
15 | |
16 # You should have received a copy of the GNU Affero General Public License | |
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
18 | |
19 import base64 | |
20 from functools import partial | |
21 from typing import TYPE_CHECKING, cast | |
22 from typing import Literal | |
23 | |
24 from pydantic import BaseModel, field_validator | |
25 from twisted.internet import defer | |
26 from twisted.words.protocols.jabber import jid | |
27 from twisted.words.xish import domish | |
28 | |
29 from libervia.backend.core.constants import Const as C | |
30 from libervia.backend.core.core_types import MessageData, SatXMPPEntity | |
31 from libervia.backend.core.i18n import _ | |
32 from libervia.backend.core.log import getLogger | |
33 from libervia.backend.memory import persistent | |
34 from libervia.backend.plugins.plugin_xep_0106 import XEP_0106 | |
35 from libervia.backend.plugins.plugin_xep_0131 import XEP_0131 | |
36 from libervia.backend.plugins.plugin_xep_0373 import get_gpg_provider | |
37 from libervia.backend.tools.common import regex | |
38 | |
39 if TYPE_CHECKING: | |
40 from libervia.backend.core.main import LiberviaBackend | |
41 | |
42 log = getLogger(__name__) | |
43 | |
44 | |
45 PLUGIN_INFO = { | |
46 C.PI_NAME: "Autocrypt", | |
47 C.PI_IMPORT_NAME: "AUTOCRYPT", | |
48 C.PI_TYPE: C.PLUG_TYPE_SEC, | |
49 C.PI_MODES: C.PLUG_MODE_BOTH, | |
50 C.PI_PROTOCOLS: [], | |
51 C.PI_DEPENDENCIES: ["XEP-0106", "XEP-0131", "XEP-0373"], | |
52 C.PI_RECOMMENDATIONS: [], | |
53 C.PI_MAIN: "Autocrypt", | |
54 C.PI_HANDLER: "no", | |
55 C.PI_DESCRIPTION: _( | |
56 "Autocrypt support, to automatically encrypt message to email gateways when " | |
57 "suitable." | |
58 ), | |
59 } | |
60 | |
61 | |
62 class AutocryptHeaderParseError(ValueError): | |
63 """Raised when Autocrypt header parsing fails""" | |
64 | |
65 | |
66 class AutocryptData(BaseModel): | |
67 """Parsed Autocrypt header data. | |
68 | |
69 @param addr: Email address for the key. | |
70 @param keydata: Base64-encoded public key. | |
71 @param prefer_encrypt: Encryption preference hint. | |
72 """ | |
73 | |
74 addr: str | |
75 keydata: str | |
76 prefer_encrypt: Literal["mutual"] | None = None | |
77 | |
78 @field_validator("addr") | |
79 @classmethod | |
80 def check_email(cls, value): | |
81 value = value.strip() | |
82 if not regex.RE_EMAIL.match(value): | |
83 raise ValueError("Invalid email address") | |
84 return value | |
85 | |
86 @field_validator("keydata") | |
87 @classmethod | |
88 def validate_keydata(cls, value: str) -> str: | |
89 """Validate keydata is proper base64""" | |
90 try: | |
91 base64.b64decode(value, validate=True) | |
92 except ValueError as e: | |
93 raise ValueError("Invalid base64 in keydata") from e | |
94 return value.strip() | |
95 | |
96 def to_header(self) -> str: | |
97 """Generate the Autocrypt header. | |
98 | |
99 @return: Formatted header value per Autocrypt specification. | |
100 """ | |
101 parts = [f"addr={self.addr}", f"keydata={self.keydata}"] | |
102 | |
103 if self.prefer_encrypt is not None: | |
104 parts.append(f"prefer-encrypt={self.prefer_encrypt}") | |
105 | |
106 return "; ".join(parts) | |
107 | |
108 | |
109 def parse_autocrypt_header(header: str) -> AutocryptData: | |
110 """Parse an Autocrypt header. | |
111 | |
112 @param header: Raw Autocrypt header value | |
113 @return: Parsed AutocryptData. | |
114 @raise AutocryptHeaderParseError: Some required field is invalid or missing. | |
115 """ | |
116 attributes = {} | |
117 for part in header.split(";"): | |
118 part = part.strip() | |
119 if not part: | |
120 continue | |
121 if "=" not in part: | |
122 # Ignore invalid parts | |
123 continue | |
124 key, value = part.split("=", 1) | |
125 key = key.strip().lower() | |
126 value = value.strip() | |
127 | |
128 if key in {"addr", "keydata", "prefer-encrypt"}: | |
129 attributes[key] = value | |
130 | |
131 if not attributes.get("addr"): | |
132 raise AutocryptHeaderParseError('Missing required "addr" attribute') | |
133 if not attributes.get("keydata"): | |
134 raise AutocryptHeaderParseError('Missing required "keydata" attribute') | |
135 | |
136 try: | |
137 return AutocryptData( | |
138 addr=attributes["addr"], | |
139 keydata=attributes["keydata"], | |
140 prefer_encrypt=attributes.get("prefer-encrypt"), | |
141 ) | |
142 except ValueError as e: | |
143 raise AutocryptHeaderParseError(f"Invalid Autocrypt header: {e}") from e | |
144 | |
145 | |
146 class Autocrypt: | |
147 | |
148 def __init__(self, host: "LiberviaBackend") -> None: | |
149 self.host = host | |
150 log.info(f"plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization") | |
151 self._shim = cast(XEP_0131, host.plugins["XEP-0131"]) | |
152 self._e = cast(XEP_0106, host.plugins["XEP-0106"]) | |
153 host.trigger.add("message_received", self.message_received_trigger) | |
154 host.trigger.add("sendMessage", self.send_message_trigger) | |
155 | |
156 def profile_connecting(self, client: SatXMPPEntity) -> None: | |
157 # Sender already handled. | |
158 client._autocrypt_seen: set[str] = set() | |
159 client._autocrypt_gpg_provider = get_gpg_provider(self.host, client) | |
160 | |
161 async def handle_autocrypt_data( | |
162 self, client: SatXMPPEntity, mess_data: MessageData, autocrypt_data_raw: str | |
163 ) -> None: | |
164 """Process Autocrypt header from XMPP email gateway | |
165 | |
166 @param client: Client session. | |
167 @param mess_data: Message data. | |
168 @param autocrypt_data: Raw Autocrypt header value | |
169 @raise AutocryptHeaderParseError: For invalid header format | |
170 """ | |
171 from_jid = mess_data["from"] | |
172 to_jid = mess_data["to"] | |
173 is_email_gateway = await self.host.memory.disco.has_identity( | |
174 client, "gateway", "smtp", jid.JID(to_jid.host) | |
175 ) | |
176 if to_jid.resource or not is_email_gateway: | |
177 log.warning("Ignoring Autocrypt header from non email gateway.") | |
178 return | |
179 | |
180 try: | |
181 autocrypt_data = parse_autocrypt_header(autocrypt_data_raw) | |
182 except AutocryptHeaderParseError as e: | |
183 log.error(f"Invalid Autocrypt header: {e}") | |
184 return | |
185 | |
186 sender_email = self._e.unescape(from_jid.user) | |
187 if sender_email != autocrypt_data.addr: | |
188 log.warning( | |
189 f"Sender email ({sender_email!r}) doesn't match autocrypt header address" | |
190 f" ({autocrypt_data.addr!r}), ignoring autocrypt data." | |
191 ) | |
192 return | |
193 | |
194 if sender_email in client._autocrypt_seen: | |
195 log.debug(f"We have already handled {sender_email!r} , nothing to do.") | |
196 return None | |
197 gpg_provider = client._autocrypt_gpg_provider | |
198 public_keys = gpg_provider.list_public_keys(sender_email) | |
199 if not public_keys: | |
200 log.debug( | |
201 f"No public key found for {sender_email!r}, importing autocrypt data." | |
202 ) | |
203 # FIXME: Maybe we should import the Autocrypt key in a separated location? | |
204 # Autocrypt is less secure than normal key management. | |
205 gpg_provider.import_public_key(base64.b64decode(autocrypt_data.keydata)) | |
206 else: | |
207 log.debug( | |
208 f"There are already known public key for {sender_email}, we skipt " | |
209 "autocrypt" | |
210 ) | |
211 client._autocrypt_seen.add(sender_email) | |
212 | |
213 def _check_headers( | |
214 self, client: SatXMPPEntity, mess_data: MessageData | |
215 ) -> MessageData: | |
216 try: | |
217 autocrypt_data = mess_data["extra"]["headers"]["autocrypt"] | |
218 except KeyError: | |
219 pass | |
220 else: | |
221 defer.ensureDeferred( | |
222 self.handle_autocrypt_data(client, mess_data, autocrypt_data) | |
223 ) | |
224 return mess_data | |
225 | |
226 def message_received_trigger( | |
227 self, | |
228 client: SatXMPPEntity, | |
229 message_elt: domish.Element, | |
230 post_treat: defer.Deferred, | |
231 ) -> Literal[True]: | |
232 post_treat.addCallback(partial(self._check_headers, client)) | |
233 return True | |
234 | |
235 async def add_autocrypt_header(self, client, mess_data: MessageData) -> MessageData: | |
236 to_jid = mess_data["to"] | |
237 if await self.host.memory.disco.has_identity( | |
238 client, "gateway", "smtp", jid.JID(to_jid.host) | |
239 ): | |
240 gpg_provider = client._autocrypt_gpg_provider | |
241 # FIXME! We currently use from jid as email, but we would need to get sender | |
242 # email from gateway instead, as we don't know what is actually used, and it | |
243 # may differ from the JID. | |
244 sender_email = mess_data["from"].userhost() | |
245 try: | |
246 public_key = next(iter(gpg_provider.list_public_keys(sender_email))) | |
247 except StopIteration: | |
248 log.debug("No public key found, can't set autocrypt header.") | |
249 return mess_data | |
250 | |
251 exported_key = gpg_provider.export_public_key(public_key) | |
252 autocrypt_data = AutocryptData( | |
253 addr=sender_email, | |
254 keydata=base64.b64encode(exported_key).decode("ascii"), | |
255 prefer_encrypt="mutual", | |
256 ) | |
257 | |
258 mess_data["extra"].setdefault("headers", {})[ | |
259 "autocrypt" | |
260 ] = autocrypt_data.to_header() | |
261 return mess_data | |
262 | |
263 def send_message_trigger( | |
264 self, client, mess_data, pre_xml_treatments, post_xml_treatments | |
265 ) -> Literal[True]: | |
266 """Process the XEP-0131 related data to be sent""" | |
267 | |
268 def add_headers(mess_data: MessageData) -> MessageData: | |
269 extra = mess_data["extra"] | |
270 self.move_keywords_to_headers(extra) | |
271 # Now we parse headers, if any. | |
272 if "headers" in extra: | |
273 headers_data = HeadersData(**extra["headers"]) | |
274 message_elt = mess_data["xml"] | |
275 message_elt.addChild(headers_data.to_element()) | |
276 return mess_data | |
277 | |
278 post_xml_treatments.addCallback( | |
279 lambda mess_data: defer.ensureDeferred( | |
280 self.add_autocrypt_header(client, mess_data) | |
281 ) | |
282 ) | |
283 return True |