view libervia/backend/plugins/plugin_sec_autocrypt.py @ 4351:6a0a081485b8

plugin autocrypt: Autocrypt protocol implementation: Implementation of autocrypt: `autocrypt` header is checked, and if present and no public key is known for the peer, the key is imported. `autocrypt` header is also added to outgoing message (only if an email gateway is detected). For the moment, the JID is use as identifier, but the real email used by gateway should be used in the future. rel 456
author Goffi <goffi@goffi.org>
date Fri, 28 Feb 2025 09:23:35 +0100
parents
children
line wrap: on
line source

#!/usr/bin/env python3

# Libervia plugin
# Copyright (C) 2009-2025 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import base64
from functools import partial
from typing import TYPE_CHECKING, cast
from typing import Literal

from pydantic import BaseModel, field_validator
from twisted.internet import defer
from twisted.words.protocols.jabber import jid
from twisted.words.xish import domish

from libervia.backend.core.constants import Const as C
from libervia.backend.core.core_types import MessageData, SatXMPPEntity
from libervia.backend.core.i18n import _
from libervia.backend.core.log import getLogger
from libervia.backend.memory import persistent
from libervia.backend.plugins.plugin_xep_0106 import XEP_0106
from libervia.backend.plugins.plugin_xep_0131 import XEP_0131
from libervia.backend.plugins.plugin_xep_0373 import get_gpg_provider
from libervia.backend.tools.common import regex

if TYPE_CHECKING:
    from libervia.backend.core.main import LiberviaBackend

log = getLogger(__name__)


PLUGIN_INFO = {
    C.PI_NAME: "Autocrypt",
    C.PI_IMPORT_NAME: "AUTOCRYPT",
    C.PI_TYPE: C.PLUG_TYPE_SEC,
    C.PI_MODES: C.PLUG_MODE_BOTH,
    C.PI_PROTOCOLS: [],
    C.PI_DEPENDENCIES: ["XEP-0106", "XEP-0131", "XEP-0373"],
    C.PI_RECOMMENDATIONS: [],
    C.PI_MAIN: "Autocrypt",
    C.PI_HANDLER: "no",
    C.PI_DESCRIPTION: _(
        "Autocrypt support, to automatically encrypt message to email gateways when "
        "suitable."
    ),
}


class AutocryptHeaderParseError(ValueError):
    """Raised when Autocrypt header parsing fails"""


class AutocryptData(BaseModel):
    """Parsed Autocrypt header data.

    @param addr: Email address for the key.
    @param keydata: Base64-encoded public key.
    @param prefer_encrypt: Encryption preference hint.
    """

    addr: str
    keydata: str
    prefer_encrypt: Literal["mutual"] | None = None

    @field_validator("addr")
    @classmethod
    def check_email(cls, value):
        value = value.strip()
        if not regex.RE_EMAIL.match(value):
            raise ValueError("Invalid email address")
        return value

    @field_validator("keydata")
    @classmethod
    def validate_keydata(cls, value: str) -> str:
        """Validate keydata is proper base64"""
        try:
            base64.b64decode(value, validate=True)
        except ValueError as e:
            raise ValueError("Invalid base64 in keydata") from e
        return value.strip()

    def to_header(self) -> str:
        """Generate the Autocrypt header.

        @return: Formatted header value per Autocrypt specification.
        """
        parts = [f"addr={self.addr}", f"keydata={self.keydata}"]

        if self.prefer_encrypt is not None:
            parts.append(f"prefer-encrypt={self.prefer_encrypt}")

        return "; ".join(parts)


def parse_autocrypt_header(header: str) -> AutocryptData:
    """Parse an Autocrypt header.

    @param header: Raw Autocrypt header value
    @return: Parsed AutocryptData.
    @raise AutocryptHeaderParseError: Some required field is invalid or missing.
    """
    attributes = {}
    for part in header.split(";"):
        part = part.strip()
        if not part:
            continue
        if "=" not in part:
            # Ignore invalid parts
            continue
        key, value = part.split("=", 1)
        key = key.strip().lower()
        value = value.strip()

        if key in {"addr", "keydata", "prefer-encrypt"}:
            attributes[key] = value

    if not attributes.get("addr"):
        raise AutocryptHeaderParseError('Missing required "addr" attribute')
    if not attributes.get("keydata"):
        raise AutocryptHeaderParseError('Missing required "keydata" attribute')

    try:
        return AutocryptData(
            addr=attributes["addr"],
            keydata=attributes["keydata"],
            prefer_encrypt=attributes.get("prefer-encrypt"),
        )
    except ValueError as e:
        raise AutocryptHeaderParseError(f"Invalid Autocrypt header: {e}") from e


class Autocrypt:

    def __init__(self, host: "LiberviaBackend") -> None:
        self.host = host
        log.info(f"plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization")
        self._shim = cast(XEP_0131, host.plugins["XEP-0131"])
        self._e = cast(XEP_0106, host.plugins["XEP-0106"])
        host.trigger.add("message_received", self.message_received_trigger)
        host.trigger.add("sendMessage", self.send_message_trigger)

    def profile_connecting(self, client: SatXMPPEntity) -> None:
        # Sender already handled.
        client._autocrypt_seen: set[str] = set()
        client._autocrypt_gpg_provider = get_gpg_provider(self.host, client)

    async def handle_autocrypt_data(
        self, client: SatXMPPEntity, mess_data: MessageData, autocrypt_data_raw: str
    ) -> None:
        """Process Autocrypt header from XMPP email gateway

        @param client: Client session.
        @param mess_data: Message data.
        @param autocrypt_data: Raw Autocrypt header value
        @raise AutocryptHeaderParseError: For invalid header format
        """
        from_jid = mess_data["from"]
        to_jid = mess_data["to"]
        is_email_gateway = await self.host.memory.disco.has_identity(
            client, "gateway", "smtp", jid.JID(to_jid.host)
        )
        if to_jid.resource or not is_email_gateway:
            log.warning("Ignoring Autocrypt header from non email gateway.")
            return

        try:
            autocrypt_data = parse_autocrypt_header(autocrypt_data_raw)
        except AutocryptHeaderParseError as e:
            log.error(f"Invalid Autocrypt header: {e}")
            return

        sender_email = self._e.unescape(from_jid.user)
        if sender_email != autocrypt_data.addr:
            log.warning(
                f"Sender email ({sender_email!r}) doesn't match autocrypt header address"
                f" ({autocrypt_data.addr!r}), ignoring autocrypt data."
            )
            return

        if sender_email in client._autocrypt_seen:
            log.debug(f"We have already handled {sender_email!r} , nothing to do.")
            return None
        gpg_provider = client._autocrypt_gpg_provider
        public_keys = gpg_provider.list_public_keys(sender_email)
        if not public_keys:
            log.debug(
                f"No public key found for {sender_email!r}, importing autocrypt data."
            )
            # FIXME: Maybe we should import the Autocrypt key in a separated location?
            #   Autocrypt is less secure than normal key management.
            gpg_provider.import_public_key(base64.b64decode(autocrypt_data.keydata))
        else:
            log.debug(
                f"There are already known public key for {sender_email}, we skipt "
                "autocrypt"
            )
        client._autocrypt_seen.add(sender_email)

    def _check_headers(
        self, client: SatXMPPEntity, mess_data: MessageData
    ) -> MessageData:
        try:
            autocrypt_data = mess_data["extra"]["headers"]["autocrypt"]
        except KeyError:
            pass
        else:
            defer.ensureDeferred(
                self.handle_autocrypt_data(client, mess_data, autocrypt_data)
            )
        return mess_data

    def message_received_trigger(
        self,
        client: SatXMPPEntity,
        message_elt: domish.Element,
        post_treat: defer.Deferred,
    ) -> Literal[True]:
        post_treat.addCallback(partial(self._check_headers, client))
        return True

    async def add_autocrypt_header(self, client, mess_data: MessageData) -> MessageData:
        to_jid = mess_data["to"]
        if await self.host.memory.disco.has_identity(
            client, "gateway", "smtp", jid.JID(to_jid.host)
        ):
            gpg_provider = client._autocrypt_gpg_provider
            # FIXME! We currently use from jid as email, but we would need to get sender
            # email from gateway instead, as we don't know what is actually used, and it
            # may differ from the JID.
            sender_email = mess_data["from"].userhost()
            try:
                public_key = next(iter(gpg_provider.list_public_keys(sender_email)))
            except StopIteration:
                log.debug("No public key found, can't set autocrypt header.")
                return mess_data

            exported_key = gpg_provider.export_public_key(public_key)
            autocrypt_data = AutocryptData(
                addr=sender_email,
                keydata=base64.b64encode(exported_key).decode("ascii"),
                prefer_encrypt="mutual",
            )

            mess_data["extra"].setdefault("headers", {})[
                "autocrypt"
            ] = autocrypt_data.to_header()
        return mess_data

    def send_message_trigger(
        self, client, mess_data, pre_xml_treatments, post_xml_treatments
    ) -> Literal[True]:
        """Process the XEP-0131 related data to be sent"""

        def add_headers(mess_data: MessageData) -> MessageData:
            extra = mess_data["extra"]
            self.move_keywords_to_headers(extra)
            # Now we parse headers, if any.
            if "headers" in extra:
                headers_data = HeadersData(**extra["headers"])
                message_elt = mess_data["xml"]
                message_elt.addChild(headers_data.to_element())
            return mess_data

        post_xml_treatments.addCallback(
            lambda mess_data: defer.ensureDeferred(
                self.add_autocrypt_header(client, mess_data)
            )
        )
        return True