Mercurial > libervia-backend
view libervia/backend/plugins/plugin_sec_autocrypt.py @ 4351:6a0a081485b8
plugin autocrypt: Autocrypt protocol implementation:
Implementation of autocrypt: `autocrypt` header is checked, and if present and no public
key is known for the peer, the key is imported.
`autocrypt` header is also added to outgoing message (only if an email gateway is
detected).
For the moment, the JID is use as identifier, but the real email used by gateway should be
used in the future.
rel 456
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 28 Feb 2025 09:23:35 +0100 |
parents | |
children |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia plugin # Copyright (C) 2009-2025 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import base64 from functools import partial from typing import TYPE_CHECKING, cast from typing import Literal from pydantic import BaseModel, field_validator from twisted.internet import defer from twisted.words.protocols.jabber import jid from twisted.words.xish import domish from libervia.backend.core.constants import Const as C from libervia.backend.core.core_types import MessageData, SatXMPPEntity from libervia.backend.core.i18n import _ from libervia.backend.core.log import getLogger from libervia.backend.memory import persistent from libervia.backend.plugins.plugin_xep_0106 import XEP_0106 from libervia.backend.plugins.plugin_xep_0131 import XEP_0131 from libervia.backend.plugins.plugin_xep_0373 import get_gpg_provider from libervia.backend.tools.common import regex if TYPE_CHECKING: from libervia.backend.core.main import LiberviaBackend log = getLogger(__name__) PLUGIN_INFO = { C.PI_NAME: "Autocrypt", C.PI_IMPORT_NAME: "AUTOCRYPT", C.PI_TYPE: C.PLUG_TYPE_SEC, C.PI_MODES: C.PLUG_MODE_BOTH, C.PI_PROTOCOLS: [], C.PI_DEPENDENCIES: ["XEP-0106", "XEP-0131", "XEP-0373"], C.PI_RECOMMENDATIONS: [], C.PI_MAIN: "Autocrypt", C.PI_HANDLER: "no", C.PI_DESCRIPTION: _( "Autocrypt support, to automatically encrypt message to email gateways when " "suitable." ), } class AutocryptHeaderParseError(ValueError): """Raised when Autocrypt header parsing fails""" class AutocryptData(BaseModel): """Parsed Autocrypt header data. @param addr: Email address for the key. @param keydata: Base64-encoded public key. @param prefer_encrypt: Encryption preference hint. """ addr: str keydata: str prefer_encrypt: Literal["mutual"] | None = None @field_validator("addr") @classmethod def check_email(cls, value): value = value.strip() if not regex.RE_EMAIL.match(value): raise ValueError("Invalid email address") return value @field_validator("keydata") @classmethod def validate_keydata(cls, value: str) -> str: """Validate keydata is proper base64""" try: base64.b64decode(value, validate=True) except ValueError as e: raise ValueError("Invalid base64 in keydata") from e return value.strip() def to_header(self) -> str: """Generate the Autocrypt header. @return: Formatted header value per Autocrypt specification. """ parts = [f"addr={self.addr}", f"keydata={self.keydata}"] if self.prefer_encrypt is not None: parts.append(f"prefer-encrypt={self.prefer_encrypt}") return "; ".join(parts) def parse_autocrypt_header(header: str) -> AutocryptData: """Parse an Autocrypt header. @param header: Raw Autocrypt header value @return: Parsed AutocryptData. @raise AutocryptHeaderParseError: Some required field is invalid or missing. """ attributes = {} for part in header.split(";"): part = part.strip() if not part: continue if "=" not in part: # Ignore invalid parts continue key, value = part.split("=", 1) key = key.strip().lower() value = value.strip() if key in {"addr", "keydata", "prefer-encrypt"}: attributes[key] = value if not attributes.get("addr"): raise AutocryptHeaderParseError('Missing required "addr" attribute') if not attributes.get("keydata"): raise AutocryptHeaderParseError('Missing required "keydata" attribute') try: return AutocryptData( addr=attributes["addr"], keydata=attributes["keydata"], prefer_encrypt=attributes.get("prefer-encrypt"), ) except ValueError as e: raise AutocryptHeaderParseError(f"Invalid Autocrypt header: {e}") from e class Autocrypt: def __init__(self, host: "LiberviaBackend") -> None: self.host = host log.info(f"plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization") self._shim = cast(XEP_0131, host.plugins["XEP-0131"]) self._e = cast(XEP_0106, host.plugins["XEP-0106"]) host.trigger.add("message_received", self.message_received_trigger) host.trigger.add("sendMessage", self.send_message_trigger) def profile_connecting(self, client: SatXMPPEntity) -> None: # Sender already handled. client._autocrypt_seen: set[str] = set() client._autocrypt_gpg_provider = get_gpg_provider(self.host, client) async def handle_autocrypt_data( self, client: SatXMPPEntity, mess_data: MessageData, autocrypt_data_raw: str ) -> None: """Process Autocrypt header from XMPP email gateway @param client: Client session. @param mess_data: Message data. @param autocrypt_data: Raw Autocrypt header value @raise AutocryptHeaderParseError: For invalid header format """ from_jid = mess_data["from"] to_jid = mess_data["to"] is_email_gateway = await self.host.memory.disco.has_identity( client, "gateway", "smtp", jid.JID(to_jid.host) ) if to_jid.resource or not is_email_gateway: log.warning("Ignoring Autocrypt header from non email gateway.") return try: autocrypt_data = parse_autocrypt_header(autocrypt_data_raw) except AutocryptHeaderParseError as e: log.error(f"Invalid Autocrypt header: {e}") return sender_email = self._e.unescape(from_jid.user) if sender_email != autocrypt_data.addr: log.warning( f"Sender email ({sender_email!r}) doesn't match autocrypt header address" f" ({autocrypt_data.addr!r}), ignoring autocrypt data." ) return if sender_email in client._autocrypt_seen: log.debug(f"We have already handled {sender_email!r} , nothing to do.") return None gpg_provider = client._autocrypt_gpg_provider public_keys = gpg_provider.list_public_keys(sender_email) if not public_keys: log.debug( f"No public key found for {sender_email!r}, importing autocrypt data." ) # FIXME: Maybe we should import the Autocrypt key in a separated location? # Autocrypt is less secure than normal key management. gpg_provider.import_public_key(base64.b64decode(autocrypt_data.keydata)) else: log.debug( f"There are already known public key for {sender_email}, we skipt " "autocrypt" ) client._autocrypt_seen.add(sender_email) def _check_headers( self, client: SatXMPPEntity, mess_data: MessageData ) -> MessageData: try: autocrypt_data = mess_data["extra"]["headers"]["autocrypt"] except KeyError: pass else: defer.ensureDeferred( self.handle_autocrypt_data(client, mess_data, autocrypt_data) ) return mess_data def message_received_trigger( self, client: SatXMPPEntity, message_elt: domish.Element, post_treat: defer.Deferred, ) -> Literal[True]: post_treat.addCallback(partial(self._check_headers, client)) return True async def add_autocrypt_header(self, client, mess_data: MessageData) -> MessageData: to_jid = mess_data["to"] if await self.host.memory.disco.has_identity( client, "gateway", "smtp", jid.JID(to_jid.host) ): gpg_provider = client._autocrypt_gpg_provider # FIXME! We currently use from jid as email, but we would need to get sender # email from gateway instead, as we don't know what is actually used, and it # may differ from the JID. sender_email = mess_data["from"].userhost() try: public_key = next(iter(gpg_provider.list_public_keys(sender_email))) except StopIteration: log.debug("No public key found, can't set autocrypt header.") return mess_data exported_key = gpg_provider.export_public_key(public_key) autocrypt_data = AutocryptData( addr=sender_email, keydata=base64.b64encode(exported_key).decode("ascii"), prefer_encrypt="mutual", ) mess_data["extra"].setdefault("headers", {})[ "autocrypt" ] = autocrypt_data.to_header() return mess_data def send_message_trigger( self, client, mess_data, pre_xml_treatments, post_xml_treatments ) -> Literal[True]: """Process the XEP-0131 related data to be sent""" def add_headers(mess_data: MessageData) -> MessageData: extra = mess_data["extra"] self.move_keywords_to_headers(extra) # Now we parse headers, if any. if "headers" in extra: headers_data = HeadersData(**extra["headers"]) message_elt = mess_data["xml"] message_elt.addChild(headers_data.to_element()) return mess_data post_xml_treatments.addCallback( lambda mess_data: defer.ensureDeferred( self.add_autocrypt_header(client, mess_data) ) ) return True