view libervia/backend/plugins/plugin_exp_gre.py @ 4348:35d41de5b2aa default tip @

doc (component): document use of Gateway Relayed Encryption: fix 455
author Goffi <goffi@goffi.org>
date Mon, 13 Jan 2025 01:23:22 +0100
parents 95f8309f86cf
children
line wrap: on
line source

#!/usr/bin/env python3

# Libervia plugin
# Copyright (C) 2009-2025 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from abc import ABC, abstractmethod
from typing import Final, TYPE_CHECKING, Self, Type, cast

from twisted.internet import defer
from twisted.words.protocols.jabber import jid, error as jabber_error
from twisted.words.protocols.jabber import xmlstream
from twisted.words.protocols.jabber.xmlstream import XMPPHandler
from twisted.words.xish import domish
from wokkel import data_form, disco, iwokkel
from zope.interface import implementer

from libervia.backend.core import exceptions
from libervia.backend.core.constants import Const as C
from libervia.backend.core.core_types import SatXMPPEntity
from libervia.backend.core.i18n import _
from libervia.backend.core.log import getLogger
from libervia.backend.plugins.plugin_xep_0106 import XEP_0106
from libervia.backend.tools import xml_tools

if TYPE_CHECKING:
    from libervia.backend.core.main import LiberviaBackend

log = getLogger(__name__)


PLUGIN_INFO = {
    C.PI_NAME: "Gateway Relayer Encryption",
    C.PI_IMPORT_NAME: "GRE",
    C.PI_TYPE: "XEP",
    C.PI_MODES: C.PLUG_MODE_BOTH,
    C.PI_PROTOCOLS: [],
    C.PI_DEPENDENCIES: ["XEP-0106"],
    C.PI_RECOMMENDATIONS: [],
    C.PI_MAIN: "GRE",
    C.PI_HANDLER: "yes",
    C.PI_DESCRIPTION: _(
        "Handle formatting and encryption to support end-to-end encryption with gateways."
    ),
}

NS_GRE_PREFIX: Final = "urn:xmpp:gre:"
NS_GRE: Final = f"{NS_GRE_PREFIX}0"
NS_GRE_FORMATTER_PREFIX: Final = f"{NS_GRE_PREFIX}formatter:"
NS_GRE_ENCRYPTER_PREFIX: Final = f"{NS_GRE_PREFIX}encrypter:"
NS_GRE_DATA: Final = f"{NS_GRE_PREFIX}data"

IQ_DATA_REQUEST = C.IQ_GET + '/data[@xmlns="' + NS_GRE + '"]'


class Formatter(ABC):

    formatters_classes: dict[str, Type[Self]] = {}
    name: str = ""
    namespace: str = ""
    _instance: Self | None = None

    def __init_subclass__(cls, **kwargs) -> None:
        """
        Registers the subclass in the formatters dictionary.

        @param kwargs: Additional keyword arguments.
        """
        assert cls.name and cls.namespace, "name and namespace must be set"
        super().__init_subclass__(**kwargs)
        cls.formatters_classes[cls.namespace] = cls

    def __init__(self, host: "LiberviaBackend") -> None:
        assert self.__class__._instance is None, "Formatter class must be singleton."
        self.__class__._instance = self
        self.host = host

    @classmethod
    def get_instance(cls) -> Self:
        if cls._instance is None:
            raise exceptions.InternalError("Formatter instance should be set.")
        return cls._instance

    @abstractmethod
    async def format(
        self,
        client: SatXMPPEntity,
        recipient_id: str,
        message_elt: domish.Element,
        encryption_data_form: data_form.Form,
    ) -> bytes:
        raise NotImplementedError


class Encrypter(ABC):

    encrypters_classes: dict[str, Type[Self]] = {}
    name: str = ""
    namespace: str = ""
    _instance: Self | None = None

    def __init_subclass__(cls, **kwargs) -> None:
        """
        Registers the subclass in the encrypters dictionary.

        @param kwargs: Additional keyword arguments.
        """
        assert cls.name and cls.namespace, "name and namespace must be set"
        super().__init_subclass__(**kwargs)
        cls.encrypters_classes[cls.namespace] = cls

    def __init__(self, host: "LiberviaBackend") -> None:
        assert self.__class__._instance is None, "Encrypter class must be singleton."
        self.__class__._instance = self
        self.host = host

    @classmethod
    def get_instance(cls) -> Self:
        if cls._instance is None:
            raise exceptions.InternalError("Encrypter instance should be set.")
        return cls._instance

    @abstractmethod
    async def encrypt(
        self,
        client: SatXMPPEntity,
        recipient_id: str,
        message_elt: domish.Element,
        formatted_payload: bytes,
        encryption_data_form: data_form.Form,
    ) -> str:
        raise NotImplementedError


class GetDataHandler(ABC):
    gre_formatters: list[str] = []
    gre_encrypters: list[str] = []

    def __init_subclass__(cls, **kwargs):
        super().__init_subclass__(**kwargs)
        if not cls.gre_formatters or not cls.gre_encrypters:
            raise TypeError(
                f'{cls.__name__} must define "gre_formatters" and "gre_encrypters"'
            )

    @abstractmethod
    async def on_relayed_encryption_data(
        self, client: SatXMPPEntity, iq_elt: domish.Element, form: data_form.Form
    ) -> None:
        raise NotImplementedError


class GRE:
    namespace = NS_GRE

    def __init__(self, host: "LiberviaBackend") -> None:
        log.info(f"plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization")
        self.host = host
        self._e = cast(XEP_0106, host.plugins["XEP-0106"])
        self._data_handlers: dict[SatXMPPEntity, GetDataHandler] = {}
        host.register_namespace("gre", NS_GRE)
        self.host.register_encryption_plugin(self, "Relayed", NS_GRE)
        host.trigger.add("send", self.send_trigger, priority=0)

    def register_get_data_handler(
        self, client: SatXMPPEntity, handler: GetDataHandler
    ) -> None:
        if client in self._data_handlers:
            raise exceptions.InternalError(
                '"register_get_data_handler" should not be called twice for the same '
                "handler."
            )
        self._data_handlers[client] = handler

    def _on_component_data_request(
        self, iq_elt: domish.Element, client: SatXMPPEntity
    ) -> None:
        iq_elt.handled = True
        defer.ensureDeferred(self.on_component_data_request(client, iq_elt))

    async def on_component_data_request(
        self, client: SatXMPPEntity, iq_elt: domish.Element
    ) -> None:
        form = data_form.Form(
            "result", "Relayed Data Encryption", formNamespace=NS_GRE_DATA
        )
        try:
            handler = self._data_handlers[client]
        except KeyError:
            pass
        else:
            await handler.on_relayed_encryption_data(client, iq_elt, form)
        iq_result_elt = xmlstream.toResponse(iq_elt, "result")
        data_elt = iq_result_elt.addElement((NS_GRE, "data"))
        data_elt.addChild(form.toElement())
        client.send(iq_result_elt)

    async def get_formatter_and_encrypter(
        self, client: SatXMPPEntity, gateway_jid: jid.JID
    ) -> tuple[Formatter, Encrypter]:
        """Retrieve Formatter and Encrypter instances for given gateway.

        @param client: client session.
        @param gateway_jid: bare jid of the gateway.
        @return: Formatter and Encrypter instances.
        @raise exceptions.FeatureNotFound: No relevant Formatter or Encrypter could be
            found.
        """
        disco_infos = await self.host.memory.disco.get_infos(client, gateway_jid)
        try:
            formatter_ns = next(
                f for f in disco_infos.features if f.startswith(NS_GRE_FORMATTER_PREFIX)
            )
            encrypter_ns = next(
                f for f in disco_infos.features if f.startswith(NS_GRE_ENCRYPTER_PREFIX)
            )
            formatter_cls = Formatter.formatters_classes[formatter_ns]
            encrypter_cls = Encrypter.encrypters_classes[encrypter_ns]
        except StopIteration as e:
            raise exceptions.FeatureNotFound("No relayed encryption found.") from e
        except KeyError as e:
            raise exceptions.FeatureNotFound(
                "No compatible relayed encryption found."
            ) from e

        return formatter_cls.get_instance(), encrypter_cls.get_instance()

    def get_encrypted_payload(
        self,
        message_elt: domish.Element,
    ) -> str | None:
        """Return encrypted payload if any.

        @param message_elt: The message element.
        @return: Encrypted payload if any, None otherwise.
        """
        encrypted_elt = next(message_elt.elements(NS_GRE, "encrypted"), None)
        if encrypted_elt is None:
            return None
        return str(encrypted_elt)

    async def send_trigger(
        self, client: SatXMPPEntity, stanza_elt: domish.Element
    ) -> bool:
        """
        @param client: Profile session.
        @param stanza: The stanza that is about to be sent.
        @return: Whether the send message flow should continue or not.
        """
        if stanza_elt.name != "message":
            return True

        try:
            recipient = jid.JID(stanza_elt["to"])
        except (jabber_error.StanzaError, RuntimeError, jid.InvalidFormat) as e:
            raise exceptions.InternalError(
                "Message without recipient encountered. Blocking further processing to"
                f" avoid leaking plaintext data: {stanza_elt.toXml()}"
            ) from e

        recipient_bare_jid = recipient.userhostJID()

        encryption_session = client.encryption.getSession(recipient_bare_jid)
        if encryption_session is None:
            return True
        if encryption_session["plugin"].namespace != NS_GRE:
            return True

        # We are in a relayed encryption session.

        encryption_data_form = await self.get_data(client, recipient_bare_jid)

        formatter, encrypter = await self.get_formatter_and_encrypter(
            client, recipient_bare_jid
        )

        try:
            recipient_id = self._e.unescape(recipient.user)
        except ValueError as e:
            raise exceptions.DataError('"to" attribute is not in expected fomat') from e

        formatted_payload = await formatter.format(
            client, recipient_id, stanza_elt, encryption_data_form
        )
        encrypted_payload = await encrypter.encrypt(
            client, recipient_id, stanza_elt, formatted_payload, encryption_data_form
        )

        for body_elt in list(stanza_elt.elements(None, "body")):
            stanza_elt.children.remove(body_elt)
        for subject_elt in list(stanza_elt.elements(None, "subject")):
            stanza_elt.children.remove(subject_elt)

        encrypted_elt = stanza_elt.addElement(
            (NS_GRE, "encrypted"), content=encrypted_payload
        )
        encrypted_elt["formatter"] = formatter.namespace
        encrypted_elt["encrypter"] = encrypter.namespace

        return True

    async def get_data(
        self, client: SatXMPPEntity, recipient_jid: jid.JID
    ) -> data_form.Form:
        """Retrieve relayed encryption data form.

        @param client: Client session.
        @param recipient_id: Bare jid of the entity to whom we want to send encrypted
            mesasge.
        @return: Found data form, or None if no data form has been found.
        """
        assert recipient_jid.resource is None, "recipient_jid must be a bare jid."
        iq_elt = client.IQ("get")
        iq_elt["to"] = recipient_jid.full()
        data_elt = iq_elt.addElement((NS_GRE, "data"))
        iq_result_elt = await iq_elt.send()
        try:
            data_elt = next(iq_result_elt.elements(NS_GRE, "data"))
        except StopIteration:
            raise exceptions.DataError(
                f"Relayed data payload is missing: {iq_result_elt.toXml()}"
            )
        form = data_form.findForm(data_elt, NS_GRE_DATA)
        if form is None:
            raise exceptions.DataError(
                f"Relayed data form is missing: {iq_result_elt.toXml()}"
            )
        return form

    async def get_trust_ui(
        self, client: SatXMPPEntity, entity: jid.JID
    ) -> xml_tools.XMLUI:
        """
        @param client: The client session.
        @param entity: The entity whose device trust levels to manage.
        @return: An XMLUI Dialog to handle trust for given entity.
        """
        # We just return an enmpty form for now.
        return xml_tools.XMLUI(C.XMLUI_FORM)

    def get_handler(self, client: SatXMPPEntity) -> XMPPHandler:
        return GREHandler(self)


@implementer(iwokkel.IDisco)
class GREHandler(XMPPHandler):

    def __init__(self, plugin_parent: GRE) -> None:
        self.plugin_parent = plugin_parent

    def connectionInitialized(self):
        assert self.parent is not None and self.xmlstream is not None
        if self.parent.is_component:
            self.xmlstream.addObserver(
                IQ_DATA_REQUEST,
                self.plugin_parent._on_component_data_request,
                client=self.parent,
            )

    def getDiscoInfo(
        self, requestor: jid.JID, target: jid.JID, nodeIdentifier: str = ""
    ) -> list[disco.DiscoFeature]:
        return [
            disco.DiscoFeature(NS_GRE),
        ]

    def getDiscoItems(
        self, requestor: jid.JID, target: jid.JID, nodeIdentifier: str = ""
    ) -> list[disco.DiscoItems]:
        return []