Mercurial > libervia-backend
view libervia/backend/plugins/plugin_sec_autocrypt.py @ 4352:382dc6e62b6e default tip
doc (components, encryption): add documentation on autocrypt for client and email gateway:
fix 456
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 28 Feb 2025 09:23:35 +0100 |
parents | 6a0a081485b8 |
children |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia plugin # Copyright (C) 2009-2025 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import base64 from functools import partial from typing import TYPE_CHECKING, cast from typing import Literal from pydantic import BaseModel, field_validator from twisted.internet import defer from twisted.words.protocols.jabber import jid from twisted.words.xish import domish from libervia.backend.core.constants import Const as C from libervia.backend.core.core_types import MessageData, SatXMPPEntity from libervia.backend.core.i18n import _ from libervia.backend.core.log import getLogger from libervia.backend.memory import persistent from libervia.backend.plugins.plugin_xep_0106 import XEP_0106 from libervia.backend.plugins.plugin_xep_0131 import XEP_0131 from libervia.backend.plugins.plugin_xep_0373 import get_gpg_provider from libervia.backend.tools.common import regex if TYPE_CHECKING: from libervia.backend.core.main import LiberviaBackend log = getLogger(__name__) PLUGIN_INFO = { C.PI_NAME: "Autocrypt", C.PI_IMPORT_NAME: "AUTOCRYPT", C.PI_TYPE: C.PLUG_TYPE_SEC, C.PI_MODES: C.PLUG_MODE_BOTH, C.PI_PROTOCOLS: [], C.PI_DEPENDENCIES: ["XEP-0106", "XEP-0131", "XEP-0373"], C.PI_RECOMMENDATIONS: [], C.PI_MAIN: "Autocrypt", C.PI_HANDLER: "no", C.PI_DESCRIPTION: _( "Autocrypt support, to automatically encrypt message to email gateways when " "suitable." ), } class AutocryptHeaderParseError(ValueError): """Raised when Autocrypt header parsing fails""" class AutocryptData(BaseModel): """Parsed Autocrypt header data. @param addr: Email address for the key. @param keydata: Base64-encoded public key. @param prefer_encrypt: Encryption preference hint. """ addr: str keydata: str prefer_encrypt: Literal["mutual"] | None = None @field_validator("addr") @classmethod def check_email(cls, value): value = value.strip() if not regex.RE_EMAIL.match(value): raise ValueError("Invalid email address") return value @field_validator("keydata") @classmethod def validate_keydata(cls, value: str) -> str: """Validate keydata is proper base64""" try: base64.b64decode(value, validate=True) except ValueError as e: raise ValueError("Invalid base64 in keydata") from e return value.strip() def to_header(self) -> str: """Generate the Autocrypt header. @return: Formatted header value per Autocrypt specification. """ parts = [f"addr={self.addr}", f"keydata={self.keydata}"] if self.prefer_encrypt is not None: parts.append(f"prefer-encrypt={self.prefer_encrypt}") return "; ".join(parts) def parse_autocrypt_header(header: str) -> AutocryptData: """Parse an Autocrypt header. @param header: Raw Autocrypt header value @return: Parsed AutocryptData. @raise AutocryptHeaderParseError: Some required field is invalid or missing. """ attributes = {} for part in header.split(";"): part = part.strip() if not part: continue if "=" not in part: # Ignore invalid parts continue key, value = part.split("=", 1) key = key.strip().lower() value = value.strip() if key in {"addr", "keydata", "prefer-encrypt"}: attributes[key] = value if not attributes.get("addr"): raise AutocryptHeaderParseError('Missing required "addr" attribute') if not attributes.get("keydata"): raise AutocryptHeaderParseError('Missing required "keydata" attribute') try: return AutocryptData( addr=attributes["addr"], keydata=attributes["keydata"], prefer_encrypt=attributes.get("prefer-encrypt"), ) except ValueError as e: raise AutocryptHeaderParseError(f"Invalid Autocrypt header: {e}") from e class Autocrypt: def __init__(self, host: "LiberviaBackend") -> None: self.host = host log.info(f"plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization") self._shim = cast(XEP_0131, host.plugins["XEP-0131"]) self._e = cast(XEP_0106, host.plugins["XEP-0106"]) host.trigger.add("message_received", self.message_received_trigger) host.trigger.add("sendMessage", self.send_message_trigger) def profile_connecting(self, client: SatXMPPEntity) -> None: # Sender already handled. client._autocrypt_seen: set[str] = set() client._autocrypt_gpg_provider = get_gpg_provider(self.host, client) async def handle_autocrypt_data( self, client: SatXMPPEntity, mess_data: MessageData, autocrypt_data_raw: str ) -> None: """Process Autocrypt header from XMPP email gateway @param client: Client session. @param mess_data: Message data. @param autocrypt_data: Raw Autocrypt header value @raise AutocryptHeaderParseError: For invalid header format """ from_jid = mess_data["from"] to_jid = mess_data["to"] is_email_gateway = await self.host.memory.disco.has_identity( client, "gateway", "smtp", jid.JID(to_jid.host) ) if to_jid.resource or not is_email_gateway: log.warning("Ignoring Autocrypt header from non email gateway.") return try: autocrypt_data = parse_autocrypt_header(autocrypt_data_raw) except AutocryptHeaderParseError as e: log.error(f"Invalid Autocrypt header: {e}") return sender_email = self._e.unescape(from_jid.user) if sender_email != autocrypt_data.addr: log.warning( f"Sender email ({sender_email!r}) doesn't match autocrypt header address" f" ({autocrypt_data.addr!r}), ignoring autocrypt data." ) return if sender_email in client._autocrypt_seen: log.debug(f"We have already handled {sender_email!r} , nothing to do.") return None gpg_provider = client._autocrypt_gpg_provider public_keys = gpg_provider.list_public_keys(sender_email) if not public_keys: log.debug( f"No public key found for {sender_email!r}, importing autocrypt data." ) # FIXME: Maybe we should import the Autocrypt key in a separated location? # Autocrypt is less secure than normal key management. gpg_provider.import_public_key(base64.b64decode(autocrypt_data.keydata)) else: log.debug( f"There are already known public key for {sender_email}, we skipt " "autocrypt" ) client._autocrypt_seen.add(sender_email) def _check_headers( self, client: SatXMPPEntity, mess_data: MessageData ) -> MessageData: try: autocrypt_data = mess_data["extra"]["headers"]["autocrypt"] except KeyError: pass else: defer.ensureDeferred( self.handle_autocrypt_data(client, mess_data, autocrypt_data) ) return mess_data def message_received_trigger( self, client: SatXMPPEntity, message_elt: domish.Element, post_treat: defer.Deferred, ) -> Literal[True]: post_treat.addCallback(partial(self._check_headers, client)) return True async def add_autocrypt_header(self, client, mess_data: MessageData) -> MessageData: to_jid = mess_data["to"] if await self.host.memory.disco.has_identity( client, "gateway", "smtp", jid.JID(to_jid.host) ): gpg_provider = client._autocrypt_gpg_provider # FIXME! We currently use from jid as email, but we would need to get sender # email from gateway instead, as we don't know what is actually used, and it # may differ from the JID. sender_email = mess_data["from"].userhost() try: public_key = next(iter(gpg_provider.list_public_keys(sender_email))) except StopIteration: log.debug("No public key found, can't set autocrypt header.") return mess_data exported_key = gpg_provider.export_public_key(public_key) autocrypt_data = AutocryptData( addr=sender_email, keydata=base64.b64encode(exported_key).decode("ascii"), prefer_encrypt="mutual", ) mess_data["extra"].setdefault("headers", {})[ "autocrypt" ] = autocrypt_data.to_header() return mess_data def send_message_trigger( self, client, mess_data, pre_xml_treatments, post_xml_treatments ) -> Literal[True]: """Process the XEP-0131 related data to be sent""" def add_headers(mess_data: MessageData) -> MessageData: extra = mess_data["extra"] self.move_keywords_to_headers(extra) # Now we parse headers, if any. if "headers" in extra: headers_data = HeadersData(**extra["headers"]) message_elt = mess_data["xml"] message_elt.addChild(headers_data.to_element()) return mess_data post_xml_treatments.addCallback( lambda mess_data: defer.ensureDeferred( self.add_autocrypt_header(client, mess_data) ) ) return True