view libervia/backend/plugins/plugin_exp_gre.py @ 4351:6a0a081485b8

plugin autocrypt: Autocrypt protocol implementation: Implementation of autocrypt: `autocrypt` header is checked, and if present and no public key is known for the peer, the key is imported. `autocrypt` header is also added to outgoing message (only if an email gateway is detected). For the moment, the JID is use as identifier, but the real email used by gateway should be used in the future. rel 456
author Goffi <goffi@goffi.org>
date Fri, 28 Feb 2025 09:23:35 +0100
parents 95f8309f86cf
children
line wrap: on
line source

#!/usr/bin/env python3

# Libervia plugin
# Copyright (C) 2009-2025 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from abc import ABC, abstractmethod
from typing import Final, TYPE_CHECKING, Self, Type, cast

from twisted.internet import defer
from twisted.words.protocols.jabber import jid, error as jabber_error
from twisted.words.protocols.jabber import xmlstream
from twisted.words.protocols.jabber.xmlstream import XMPPHandler
from twisted.words.xish import domish
from wokkel import data_form, disco, iwokkel
from zope.interface import implementer

from libervia.backend.core import exceptions
from libervia.backend.core.constants import Const as C
from libervia.backend.core.core_types import SatXMPPEntity
from libervia.backend.core.i18n import _
from libervia.backend.core.log import getLogger
from libervia.backend.plugins.plugin_xep_0106 import XEP_0106
from libervia.backend.tools import xml_tools

if TYPE_CHECKING:
    from libervia.backend.core.main import LiberviaBackend

log = getLogger(__name__)


PLUGIN_INFO = {
    C.PI_NAME: "Gateway Relayer Encryption",
    C.PI_IMPORT_NAME: "GRE",
    C.PI_TYPE: "XEP",
    C.PI_MODES: C.PLUG_MODE_BOTH,
    C.PI_PROTOCOLS: [],
    C.PI_DEPENDENCIES: ["XEP-0106"],
    C.PI_RECOMMENDATIONS: [],
    C.PI_MAIN: "GRE",
    C.PI_HANDLER: "yes",
    C.PI_DESCRIPTION: _(
        "Handle formatting and encryption to support end-to-end encryption with gateways."
    ),
}

NS_GRE_PREFIX: Final = "urn:xmpp:gre:"
NS_GRE: Final = f"{NS_GRE_PREFIX}0"
NS_GRE_FORMATTER_PREFIX: Final = f"{NS_GRE_PREFIX}formatter:"
NS_GRE_ENCRYPTER_PREFIX: Final = f"{NS_GRE_PREFIX}encrypter:"
NS_GRE_DATA: Final = f"{NS_GRE_PREFIX}data"

IQ_DATA_REQUEST = C.IQ_GET + '/data[@xmlns="' + NS_GRE + '"]'


class Formatter(ABC):

    formatters_classes: dict[str, Type[Self]] = {}
    name: str = ""
    namespace: str = ""
    _instance: Self | None = None

    def __init_subclass__(cls, **kwargs) -> None:
        """
        Registers the subclass in the formatters dictionary.

        @param kwargs: Additional keyword arguments.
        """
        assert cls.name and cls.namespace, "name and namespace must be set"
        super().__init_subclass__(**kwargs)
        cls.formatters_classes[cls.namespace] = cls

    def __init__(self, host: "LiberviaBackend") -> None:
        assert self.__class__._instance is None, "Formatter class must be singleton."
        self.__class__._instance = self
        self.host = host

    @classmethod
    def get_instance(cls) -> Self:
        if cls._instance is None:
            raise exceptions.InternalError("Formatter instance should be set.")
        return cls._instance

    @abstractmethod
    async def format(
        self,
        client: SatXMPPEntity,
        recipient_id: str,
        message_elt: domish.Element,
        encryption_data_form: data_form.Form,
    ) -> bytes:
        raise NotImplementedError


class Encrypter(ABC):

    encrypters_classes: dict[str, Type[Self]] = {}
    name: str = ""
    namespace: str = ""
    _instance: Self | None = None

    def __init_subclass__(cls, **kwargs) -> None:
        """
        Registers the subclass in the encrypters dictionary.

        @param kwargs: Additional keyword arguments.
        """
        assert cls.name and cls.namespace, "name and namespace must be set"
        super().__init_subclass__(**kwargs)
        cls.encrypters_classes[cls.namespace] = cls

    def __init__(self, host: "LiberviaBackend") -> None:
        assert self.__class__._instance is None, "Encrypter class must be singleton."
        self.__class__._instance = self
        self.host = host

    @classmethod
    def get_instance(cls) -> Self:
        if cls._instance is None:
            raise exceptions.InternalError("Encrypter instance should be set.")
        return cls._instance

    @abstractmethod
    async def encrypt(
        self,
        client: SatXMPPEntity,
        recipient_id: str,
        message_elt: domish.Element,
        formatted_payload: bytes,
        encryption_data_form: data_form.Form,
    ) -> str:
        raise NotImplementedError


class GetDataHandler(ABC):
    gre_formatters: list[str] = []
    gre_encrypters: list[str] = []

    def __init_subclass__(cls, **kwargs):
        super().__init_subclass__(**kwargs)
        if not cls.gre_formatters or not cls.gre_encrypters:
            raise TypeError(
                f'{cls.__name__} must define "gre_formatters" and "gre_encrypters"'
            )

    @abstractmethod
    async def on_relayed_encryption_data(
        self, client: SatXMPPEntity, iq_elt: domish.Element, form: data_form.Form
    ) -> None:
        raise NotImplementedError


class GRE:
    namespace = NS_GRE

    def __init__(self, host: "LiberviaBackend") -> None:
        log.info(f"plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization")
        self.host = host
        self._e = cast(XEP_0106, host.plugins["XEP-0106"])
        self._data_handlers: dict[SatXMPPEntity, GetDataHandler] = {}
        host.register_namespace("gre", NS_GRE)
        self.host.register_encryption_plugin(self, "Relayed", NS_GRE)
        host.trigger.add("send", self.send_trigger, priority=0)

    def register_get_data_handler(
        self, client: SatXMPPEntity, handler: GetDataHandler
    ) -> None:
        if client in self._data_handlers:
            raise exceptions.InternalError(
                '"register_get_data_handler" should not be called twice for the same '
                "handler."
            )
        self._data_handlers[client] = handler

    def _on_component_data_request(
        self, iq_elt: domish.Element, client: SatXMPPEntity
    ) -> None:
        iq_elt.handled = True
        defer.ensureDeferred(self.on_component_data_request(client, iq_elt))

    async def on_component_data_request(
        self, client: SatXMPPEntity, iq_elt: domish.Element
    ) -> None:
        form = data_form.Form(
            "result", "Relayed Data Encryption", formNamespace=NS_GRE_DATA
        )
        try:
            handler = self._data_handlers[client]
        except KeyError:
            pass
        else:
            await handler.on_relayed_encryption_data(client, iq_elt, form)
        iq_result_elt = xmlstream.toResponse(iq_elt, "result")
        data_elt = iq_result_elt.addElement((NS_GRE, "data"))
        data_elt.addChild(form.toElement())
        client.send(iq_result_elt)

    async def get_formatter_and_encrypter(
        self, client: SatXMPPEntity, gateway_jid: jid.JID
    ) -> tuple[Formatter, Encrypter]:
        """Retrieve Formatter and Encrypter instances for given gateway.

        @param client: client session.
        @param gateway_jid: bare jid of the gateway.
        @return: Formatter and Encrypter instances.
        @raise exceptions.FeatureNotFound: No relevant Formatter or Encrypter could be
            found.
        """
        disco_infos = await self.host.memory.disco.get_infos(client, gateway_jid)
        try:
            formatter_ns = next(
                f for f in disco_infos.features if f.startswith(NS_GRE_FORMATTER_PREFIX)
            )
            encrypter_ns = next(
                f for f in disco_infos.features if f.startswith(NS_GRE_ENCRYPTER_PREFIX)
            )
            formatter_cls = Formatter.formatters_classes[formatter_ns]
            encrypter_cls = Encrypter.encrypters_classes[encrypter_ns]
        except StopIteration as e:
            raise exceptions.FeatureNotFound("No relayed encryption found.") from e
        except KeyError as e:
            raise exceptions.FeatureNotFound(
                "No compatible relayed encryption found."
            ) from e

        return formatter_cls.get_instance(), encrypter_cls.get_instance()

    def get_encrypted_payload(
        self,
        message_elt: domish.Element,
    ) -> str | None:
        """Return encrypted payload if any.

        @param message_elt: The message element.
        @return: Encrypted payload if any, None otherwise.
        """
        encrypted_elt = next(message_elt.elements(NS_GRE, "encrypted"), None)
        if encrypted_elt is None:
            return None
        return str(encrypted_elt)

    async def send_trigger(
        self, client: SatXMPPEntity, stanza_elt: domish.Element
    ) -> bool:
        """
        @param client: Profile session.
        @param stanza: The stanza that is about to be sent.
        @return: Whether the send message flow should continue or not.
        """
        if stanza_elt.name != "message":
            return True

        try:
            recipient = jid.JID(stanza_elt["to"])
        except (jabber_error.StanzaError, RuntimeError, jid.InvalidFormat) as e:
            raise exceptions.InternalError(
                "Message without recipient encountered. Blocking further processing to"
                f" avoid leaking plaintext data: {stanza_elt.toXml()}"
            ) from e

        recipient_bare_jid = recipient.userhostJID()

        encryption_session = client.encryption.getSession(recipient_bare_jid)
        if encryption_session is None:
            return True
        if encryption_session["plugin"].namespace != NS_GRE:
            return True

        # We are in a relayed encryption session.

        encryption_data_form = await self.get_data(client, recipient_bare_jid)

        formatter, encrypter = await self.get_formatter_and_encrypter(
            client, recipient_bare_jid
        )

        try:
            recipient_id = self._e.unescape(recipient.user)
        except ValueError as e:
            raise exceptions.DataError('"to" attribute is not in expected fomat') from e

        formatted_payload = await formatter.format(
            client, recipient_id, stanza_elt, encryption_data_form
        )
        encrypted_payload = await encrypter.encrypt(
            client, recipient_id, stanza_elt, formatted_payload, encryption_data_form
        )

        for body_elt in list(stanza_elt.elements(None, "body")):
            stanza_elt.children.remove(body_elt)
        for subject_elt in list(stanza_elt.elements(None, "subject")):
            stanza_elt.children.remove(subject_elt)

        encrypted_elt = stanza_elt.addElement(
            (NS_GRE, "encrypted"), content=encrypted_payload
        )
        encrypted_elt["formatter"] = formatter.namespace
        encrypted_elt["encrypter"] = encrypter.namespace

        return True

    async def get_data(
        self, client: SatXMPPEntity, recipient_jid: jid.JID
    ) -> data_form.Form:
        """Retrieve relayed encryption data form.

        @param client: Client session.
        @param recipient_id: Bare jid of the entity to whom we want to send encrypted
            mesasge.
        @return: Found data form, or None if no data form has been found.
        """
        assert recipient_jid.resource is None, "recipient_jid must be a bare jid."
        iq_elt = client.IQ("get")
        iq_elt["to"] = recipient_jid.full()
        data_elt = iq_elt.addElement((NS_GRE, "data"))
        iq_result_elt = await iq_elt.send()
        try:
            data_elt = next(iq_result_elt.elements(NS_GRE, "data"))
        except StopIteration:
            raise exceptions.DataError(
                f"Relayed data payload is missing: {iq_result_elt.toXml()}"
            )
        form = data_form.findForm(data_elt, NS_GRE_DATA)
        if form is None:
            raise exceptions.DataError(
                f"Relayed data form is missing: {iq_result_elt.toXml()}"
            )
        return form

    async def get_trust_ui(
        self, client: SatXMPPEntity, entity: jid.JID
    ) -> xml_tools.XMLUI:
        """
        @param client: The client session.
        @param entity: The entity whose device trust levels to manage.
        @return: An XMLUI Dialog to handle trust for given entity.
        """
        # We just return an enmpty form for now.
        return xml_tools.XMLUI(C.XMLUI_FORM)

    def get_handler(self, client: SatXMPPEntity) -> XMPPHandler:
        return GREHandler(self)


@implementer(iwokkel.IDisco)
class GREHandler(XMPPHandler):

    def __init__(self, plugin_parent: GRE) -> None:
        self.plugin_parent = plugin_parent

    def connectionInitialized(self):
        assert self.parent is not None and self.xmlstream is not None
        if self.parent.is_component:
            self.xmlstream.addObserver(
                IQ_DATA_REQUEST,
                self.plugin_parent._on_component_data_request,
                client=self.parent,
            )

    def getDiscoInfo(
        self, requestor: jid.JID, target: jid.JID, nodeIdentifier: str = ""
    ) -> list[disco.DiscoFeature]:
        return [
            disco.DiscoFeature(NS_GRE),
        ]

    def getDiscoItems(
        self, requestor: jid.JID, target: jid.JID, nodeIdentifier: str = ""
    ) -> list[disco.DiscoItems]:
        return []