changeset 4351:6a0a081485b8

plugin autocrypt: Autocrypt protocol implementation: Implementation of autocrypt: `autocrypt` header is checked, and if present and no public key is known for the peer, the key is imported. `autocrypt` header is also added to outgoing message (only if an email gateway is detected). For the moment, the JID is use as identifier, but the real email used by gateway should be used in the future. rel 456
author Goffi <goffi@goffi.org>
date Fri, 28 Feb 2025 09:23:35 +0100
parents 6baea959dc33
children 382dc6e62b6e
files libervia/backend/plugins/plugin_sec_autocrypt.py
diffstat 1 files changed, 283 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/libervia/backend/plugins/plugin_sec_autocrypt.py	Fri Feb 28 09:23:35 2025 +0100
@@ -0,0 +1,283 @@
+#!/usr/bin/env python3
+
+# Libervia plugin
+# Copyright (C) 2009-2025 Jérôme Poisson (goffi@goffi.org)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+import base64
+from functools import partial
+from typing import TYPE_CHECKING, cast
+from typing import Literal
+
+from pydantic import BaseModel, field_validator
+from twisted.internet import defer
+from twisted.words.protocols.jabber import jid
+from twisted.words.xish import domish
+
+from libervia.backend.core.constants import Const as C
+from libervia.backend.core.core_types import MessageData, SatXMPPEntity
+from libervia.backend.core.i18n import _
+from libervia.backend.core.log import getLogger
+from libervia.backend.memory import persistent
+from libervia.backend.plugins.plugin_xep_0106 import XEP_0106
+from libervia.backend.plugins.plugin_xep_0131 import XEP_0131
+from libervia.backend.plugins.plugin_xep_0373 import get_gpg_provider
+from libervia.backend.tools.common import regex
+
+if TYPE_CHECKING:
+    from libervia.backend.core.main import LiberviaBackend
+
+log = getLogger(__name__)
+
+
+PLUGIN_INFO = {
+    C.PI_NAME: "Autocrypt",
+    C.PI_IMPORT_NAME: "AUTOCRYPT",
+    C.PI_TYPE: C.PLUG_TYPE_SEC,
+    C.PI_MODES: C.PLUG_MODE_BOTH,
+    C.PI_PROTOCOLS: [],
+    C.PI_DEPENDENCIES: ["XEP-0106", "XEP-0131", "XEP-0373"],
+    C.PI_RECOMMENDATIONS: [],
+    C.PI_MAIN: "Autocrypt",
+    C.PI_HANDLER: "no",
+    C.PI_DESCRIPTION: _(
+        "Autocrypt support, to automatically encrypt message to email gateways when "
+        "suitable."
+    ),
+}
+
+
+class AutocryptHeaderParseError(ValueError):
+    """Raised when Autocrypt header parsing fails"""
+
+
+class AutocryptData(BaseModel):
+    """Parsed Autocrypt header data.
+
+    @param addr: Email address for the key.
+    @param keydata: Base64-encoded public key.
+    @param prefer_encrypt: Encryption preference hint.
+    """
+
+    addr: str
+    keydata: str
+    prefer_encrypt: Literal["mutual"] | None = None
+
+    @field_validator("addr")
+    @classmethod
+    def check_email(cls, value):
+        value = value.strip()
+        if not regex.RE_EMAIL.match(value):
+            raise ValueError("Invalid email address")
+        return value
+
+    @field_validator("keydata")
+    @classmethod
+    def validate_keydata(cls, value: str) -> str:
+        """Validate keydata is proper base64"""
+        try:
+            base64.b64decode(value, validate=True)
+        except ValueError as e:
+            raise ValueError("Invalid base64 in keydata") from e
+        return value.strip()
+
+    def to_header(self) -> str:
+        """Generate the Autocrypt header.
+
+        @return: Formatted header value per Autocrypt specification.
+        """
+        parts = [f"addr={self.addr}", f"keydata={self.keydata}"]
+
+        if self.prefer_encrypt is not None:
+            parts.append(f"prefer-encrypt={self.prefer_encrypt}")
+
+        return "; ".join(parts)
+
+
+def parse_autocrypt_header(header: str) -> AutocryptData:
+    """Parse an Autocrypt header.
+
+    @param header: Raw Autocrypt header value
+    @return: Parsed AutocryptData.
+    @raise AutocryptHeaderParseError: Some required field is invalid or missing.
+    """
+    attributes = {}
+    for part in header.split(";"):
+        part = part.strip()
+        if not part:
+            continue
+        if "=" not in part:
+            # Ignore invalid parts
+            continue
+        key, value = part.split("=", 1)
+        key = key.strip().lower()
+        value = value.strip()
+
+        if key in {"addr", "keydata", "prefer-encrypt"}:
+            attributes[key] = value
+
+    if not attributes.get("addr"):
+        raise AutocryptHeaderParseError('Missing required "addr" attribute')
+    if not attributes.get("keydata"):
+        raise AutocryptHeaderParseError('Missing required "keydata" attribute')
+
+    try:
+        return AutocryptData(
+            addr=attributes["addr"],
+            keydata=attributes["keydata"],
+            prefer_encrypt=attributes.get("prefer-encrypt"),
+        )
+    except ValueError as e:
+        raise AutocryptHeaderParseError(f"Invalid Autocrypt header: {e}") from e
+
+
+class Autocrypt:
+
+    def __init__(self, host: "LiberviaBackend") -> None:
+        self.host = host
+        log.info(f"plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization")
+        self._shim = cast(XEP_0131, host.plugins["XEP-0131"])
+        self._e = cast(XEP_0106, host.plugins["XEP-0106"])
+        host.trigger.add("message_received", self.message_received_trigger)
+        host.trigger.add("sendMessage", self.send_message_trigger)
+
+    def profile_connecting(self, client: SatXMPPEntity) -> None:
+        # Sender already handled.
+        client._autocrypt_seen: set[str] = set()
+        client._autocrypt_gpg_provider = get_gpg_provider(self.host, client)
+
+    async def handle_autocrypt_data(
+        self, client: SatXMPPEntity, mess_data: MessageData, autocrypt_data_raw: str
+    ) -> None:
+        """Process Autocrypt header from XMPP email gateway
+
+        @param client: Client session.
+        @param mess_data: Message data.
+        @param autocrypt_data: Raw Autocrypt header value
+        @raise AutocryptHeaderParseError: For invalid header format
+        """
+        from_jid = mess_data["from"]
+        to_jid = mess_data["to"]
+        is_email_gateway = await self.host.memory.disco.has_identity(
+            client, "gateway", "smtp", jid.JID(to_jid.host)
+        )
+        if to_jid.resource or not is_email_gateway:
+            log.warning("Ignoring Autocrypt header from non email gateway.")
+            return
+
+        try:
+            autocrypt_data = parse_autocrypt_header(autocrypt_data_raw)
+        except AutocryptHeaderParseError as e:
+            log.error(f"Invalid Autocrypt header: {e}")
+            return
+
+        sender_email = self._e.unescape(from_jid.user)
+        if sender_email != autocrypt_data.addr:
+            log.warning(
+                f"Sender email ({sender_email!r}) doesn't match autocrypt header address"
+                f" ({autocrypt_data.addr!r}), ignoring autocrypt data."
+            )
+            return
+
+        if sender_email in client._autocrypt_seen:
+            log.debug(f"We have already handled {sender_email!r} , nothing to do.")
+            return None
+        gpg_provider = client._autocrypt_gpg_provider
+        public_keys = gpg_provider.list_public_keys(sender_email)
+        if not public_keys:
+            log.debug(
+                f"No public key found for {sender_email!r}, importing autocrypt data."
+            )
+            # FIXME: Maybe we should import the Autocrypt key in a separated location?
+            #   Autocrypt is less secure than normal key management.
+            gpg_provider.import_public_key(base64.b64decode(autocrypt_data.keydata))
+        else:
+            log.debug(
+                f"There are already known public key for {sender_email}, we skipt "
+                "autocrypt"
+            )
+        client._autocrypt_seen.add(sender_email)
+
+    def _check_headers(
+        self, client: SatXMPPEntity, mess_data: MessageData
+    ) -> MessageData:
+        try:
+            autocrypt_data = mess_data["extra"]["headers"]["autocrypt"]
+        except KeyError:
+            pass
+        else:
+            defer.ensureDeferred(
+                self.handle_autocrypt_data(client, mess_data, autocrypt_data)
+            )
+        return mess_data
+
+    def message_received_trigger(
+        self,
+        client: SatXMPPEntity,
+        message_elt: domish.Element,
+        post_treat: defer.Deferred,
+    ) -> Literal[True]:
+        post_treat.addCallback(partial(self._check_headers, client))
+        return True
+
+    async def add_autocrypt_header(self, client, mess_data: MessageData) -> MessageData:
+        to_jid = mess_data["to"]
+        if await self.host.memory.disco.has_identity(
+            client, "gateway", "smtp", jid.JID(to_jid.host)
+        ):
+            gpg_provider = client._autocrypt_gpg_provider
+            # FIXME! We currently use from jid as email, but we would need to get sender
+            # email from gateway instead, as we don't know what is actually used, and it
+            # may differ from the JID.
+            sender_email = mess_data["from"].userhost()
+            try:
+                public_key = next(iter(gpg_provider.list_public_keys(sender_email)))
+            except StopIteration:
+                log.debug("No public key found, can't set autocrypt header.")
+                return mess_data
+
+            exported_key = gpg_provider.export_public_key(public_key)
+            autocrypt_data = AutocryptData(
+                addr=sender_email,
+                keydata=base64.b64encode(exported_key).decode("ascii"),
+                prefer_encrypt="mutual",
+            )
+
+            mess_data["extra"].setdefault("headers", {})[
+                "autocrypt"
+            ] = autocrypt_data.to_header()
+        return mess_data
+
+    def send_message_trigger(
+        self, client, mess_data, pre_xml_treatments, post_xml_treatments
+    ) -> Literal[True]:
+        """Process the XEP-0131 related data to be sent"""
+
+        def add_headers(mess_data: MessageData) -> MessageData:
+            extra = mess_data["extra"]
+            self.move_keywords_to_headers(extra)
+            # Now we parse headers, if any.
+            if "headers" in extra:
+                headers_data = HeadersData(**extra["headers"])
+                message_elt = mess_data["xml"]
+                message_elt.addChild(headers_data.to_element())
+            return mess_data
+
+        post_xml_treatments.addCallback(
+            lambda mess_data: defer.ensureDeferred(
+                self.add_autocrypt_header(client, mess_data)
+            )
+        )
+        return True