comparison libervia/backend/plugins/plugin_sec_autocrypt.py @ 4351:6a0a081485b8

plugin autocrypt: Autocrypt protocol implementation: Implementation of autocrypt: `autocrypt` header is checked, and if present and no public key is known for the peer, the key is imported. `autocrypt` header is also added to outgoing message (only if an email gateway is detected). For the moment, the JID is use as identifier, but the real email used by gateway should be used in the future. rel 456
author Goffi <goffi@goffi.org>
date Fri, 28 Feb 2025 09:23:35 +0100
parents
children
comparison
equal deleted inserted replaced
4350:6baea959dc33 4351:6a0a081485b8
1 #!/usr/bin/env python3
2
3 # Libervia plugin
4 # Copyright (C) 2009-2025 Jérôme Poisson (goffi@goffi.org)
5
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU Affero General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
10
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU Affero General Public License for more details.
15
16 # You should have received a copy of the GNU Affero General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19 import base64
20 from functools import partial
21 from typing import TYPE_CHECKING, cast
22 from typing import Literal
23
24 from pydantic import BaseModel, field_validator
25 from twisted.internet import defer
26 from twisted.words.protocols.jabber import jid
27 from twisted.words.xish import domish
28
29 from libervia.backend.core.constants import Const as C
30 from libervia.backend.core.core_types import MessageData, SatXMPPEntity
31 from libervia.backend.core.i18n import _
32 from libervia.backend.core.log import getLogger
33 from libervia.backend.memory import persistent
34 from libervia.backend.plugins.plugin_xep_0106 import XEP_0106
35 from libervia.backend.plugins.plugin_xep_0131 import XEP_0131
36 from libervia.backend.plugins.plugin_xep_0373 import get_gpg_provider
37 from libervia.backend.tools.common import regex
38
39 if TYPE_CHECKING:
40 from libervia.backend.core.main import LiberviaBackend
41
42 log = getLogger(__name__)
43
44
45 PLUGIN_INFO = {
46 C.PI_NAME: "Autocrypt",
47 C.PI_IMPORT_NAME: "AUTOCRYPT",
48 C.PI_TYPE: C.PLUG_TYPE_SEC,
49 C.PI_MODES: C.PLUG_MODE_BOTH,
50 C.PI_PROTOCOLS: [],
51 C.PI_DEPENDENCIES: ["XEP-0106", "XEP-0131", "XEP-0373"],
52 C.PI_RECOMMENDATIONS: [],
53 C.PI_MAIN: "Autocrypt",
54 C.PI_HANDLER: "no",
55 C.PI_DESCRIPTION: _(
56 "Autocrypt support, to automatically encrypt message to email gateways when "
57 "suitable."
58 ),
59 }
60
61
62 class AutocryptHeaderParseError(ValueError):
63 """Raised when Autocrypt header parsing fails"""
64
65
66 class AutocryptData(BaseModel):
67 """Parsed Autocrypt header data.
68
69 @param addr: Email address for the key.
70 @param keydata: Base64-encoded public key.
71 @param prefer_encrypt: Encryption preference hint.
72 """
73
74 addr: str
75 keydata: str
76 prefer_encrypt: Literal["mutual"] | None = None
77
78 @field_validator("addr")
79 @classmethod
80 def check_email(cls, value):
81 value = value.strip()
82 if not regex.RE_EMAIL.match(value):
83 raise ValueError("Invalid email address")
84 return value
85
86 @field_validator("keydata")
87 @classmethod
88 def validate_keydata(cls, value: str) -> str:
89 """Validate keydata is proper base64"""
90 try:
91 base64.b64decode(value, validate=True)
92 except ValueError as e:
93 raise ValueError("Invalid base64 in keydata") from e
94 return value.strip()
95
96 def to_header(self) -> str:
97 """Generate the Autocrypt header.
98
99 @return: Formatted header value per Autocrypt specification.
100 """
101 parts = [f"addr={self.addr}", f"keydata={self.keydata}"]
102
103 if self.prefer_encrypt is not None:
104 parts.append(f"prefer-encrypt={self.prefer_encrypt}")
105
106 return "; ".join(parts)
107
108
109 def parse_autocrypt_header(header: str) -> AutocryptData:
110 """Parse an Autocrypt header.
111
112 @param header: Raw Autocrypt header value
113 @return: Parsed AutocryptData.
114 @raise AutocryptHeaderParseError: Some required field is invalid or missing.
115 """
116 attributes = {}
117 for part in header.split(";"):
118 part = part.strip()
119 if not part:
120 continue
121 if "=" not in part:
122 # Ignore invalid parts
123 continue
124 key, value = part.split("=", 1)
125 key = key.strip().lower()
126 value = value.strip()
127
128 if key in {"addr", "keydata", "prefer-encrypt"}:
129 attributes[key] = value
130
131 if not attributes.get("addr"):
132 raise AutocryptHeaderParseError('Missing required "addr" attribute')
133 if not attributes.get("keydata"):
134 raise AutocryptHeaderParseError('Missing required "keydata" attribute')
135
136 try:
137 return AutocryptData(
138 addr=attributes["addr"],
139 keydata=attributes["keydata"],
140 prefer_encrypt=attributes.get("prefer-encrypt"),
141 )
142 except ValueError as e:
143 raise AutocryptHeaderParseError(f"Invalid Autocrypt header: {e}") from e
144
145
146 class Autocrypt:
147
148 def __init__(self, host: "LiberviaBackend") -> None:
149 self.host = host
150 log.info(f"plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization")
151 self._shim = cast(XEP_0131, host.plugins["XEP-0131"])
152 self._e = cast(XEP_0106, host.plugins["XEP-0106"])
153 host.trigger.add("message_received", self.message_received_trigger)
154 host.trigger.add("sendMessage", self.send_message_trigger)
155
156 def profile_connecting(self, client: SatXMPPEntity) -> None:
157 # Sender already handled.
158 client._autocrypt_seen: set[str] = set()
159 client._autocrypt_gpg_provider = get_gpg_provider(self.host, client)
160
161 async def handle_autocrypt_data(
162 self, client: SatXMPPEntity, mess_data: MessageData, autocrypt_data_raw: str
163 ) -> None:
164 """Process Autocrypt header from XMPP email gateway
165
166 @param client: Client session.
167 @param mess_data: Message data.
168 @param autocrypt_data: Raw Autocrypt header value
169 @raise AutocryptHeaderParseError: For invalid header format
170 """
171 from_jid = mess_data["from"]
172 to_jid = mess_data["to"]
173 is_email_gateway = await self.host.memory.disco.has_identity(
174 client, "gateway", "smtp", jid.JID(to_jid.host)
175 )
176 if to_jid.resource or not is_email_gateway:
177 log.warning("Ignoring Autocrypt header from non email gateway.")
178 return
179
180 try:
181 autocrypt_data = parse_autocrypt_header(autocrypt_data_raw)
182 except AutocryptHeaderParseError as e:
183 log.error(f"Invalid Autocrypt header: {e}")
184 return
185
186 sender_email = self._e.unescape(from_jid.user)
187 if sender_email != autocrypt_data.addr:
188 log.warning(
189 f"Sender email ({sender_email!r}) doesn't match autocrypt header address"
190 f" ({autocrypt_data.addr!r}), ignoring autocrypt data."
191 )
192 return
193
194 if sender_email in client._autocrypt_seen:
195 log.debug(f"We have already handled {sender_email!r} , nothing to do.")
196 return None
197 gpg_provider = client._autocrypt_gpg_provider
198 public_keys = gpg_provider.list_public_keys(sender_email)
199 if not public_keys:
200 log.debug(
201 f"No public key found for {sender_email!r}, importing autocrypt data."
202 )
203 # FIXME: Maybe we should import the Autocrypt key in a separated location?
204 # Autocrypt is less secure than normal key management.
205 gpg_provider.import_public_key(base64.b64decode(autocrypt_data.keydata))
206 else:
207 log.debug(
208 f"There are already known public key for {sender_email}, we skipt "
209 "autocrypt"
210 )
211 client._autocrypt_seen.add(sender_email)
212
213 def _check_headers(
214 self, client: SatXMPPEntity, mess_data: MessageData
215 ) -> MessageData:
216 try:
217 autocrypt_data = mess_data["extra"]["headers"]["autocrypt"]
218 except KeyError:
219 pass
220 else:
221 defer.ensureDeferred(
222 self.handle_autocrypt_data(client, mess_data, autocrypt_data)
223 )
224 return mess_data
225
226 def message_received_trigger(
227 self,
228 client: SatXMPPEntity,
229 message_elt: domish.Element,
230 post_treat: defer.Deferred,
231 ) -> Literal[True]:
232 post_treat.addCallback(partial(self._check_headers, client))
233 return True
234
235 async def add_autocrypt_header(self, client, mess_data: MessageData) -> MessageData:
236 to_jid = mess_data["to"]
237 if await self.host.memory.disco.has_identity(
238 client, "gateway", "smtp", jid.JID(to_jid.host)
239 ):
240 gpg_provider = client._autocrypt_gpg_provider
241 # FIXME! We currently use from jid as email, but we would need to get sender
242 # email from gateway instead, as we don't know what is actually used, and it
243 # may differ from the JID.
244 sender_email = mess_data["from"].userhost()
245 try:
246 public_key = next(iter(gpg_provider.list_public_keys(sender_email)))
247 except StopIteration:
248 log.debug("No public key found, can't set autocrypt header.")
249 return mess_data
250
251 exported_key = gpg_provider.export_public_key(public_key)
252 autocrypt_data = AutocryptData(
253 addr=sender_email,
254 keydata=base64.b64encode(exported_key).decode("ascii"),
255 prefer_encrypt="mutual",
256 )
257
258 mess_data["extra"].setdefault("headers", {})[
259 "autocrypt"
260 ] = autocrypt_data.to_header()
261 return mess_data
262
263 def send_message_trigger(
264 self, client, mess_data, pre_xml_treatments, post_xml_treatments
265 ) -> Literal[True]:
266 """Process the XEP-0131 related data to be sent"""
267
268 def add_headers(mess_data: MessageData) -> MessageData:
269 extra = mess_data["extra"]
270 self.move_keywords_to_headers(extra)
271 # Now we parse headers, if any.
272 if "headers" in extra:
273 headers_data = HeadersData(**extra["headers"])
274 message_elt = mess_data["xml"]
275 message_elt.addChild(headers_data.to_element())
276 return mess_data
277
278 post_xml_treatments.addCallback(
279 lambda mess_data: defer.ensureDeferred(
280 self.add_autocrypt_header(client, mess_data)
281 )
282 )
283 return True