diff libervia/backend/plugins/plugin_exp_gre.py @ 4344:95f8309f86cf

plugin GRE: implements Gateway Relayed Encryption: rel 455
author Goffi <goffi@goffi.org>
date Mon, 13 Jan 2025 01:23:22 +0100 (8 days ago)
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/libervia/backend/plugins/plugin_exp_gre.py	Mon Jan 13 01:23:22 2025 +0100
@@ -0,0 +1,382 @@
+#!/usr/bin/env python3
+
+# Libervia plugin
+# Copyright (C) 2009-2025 Jérôme Poisson (goffi@goffi.org)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+from abc import ABC, abstractmethod
+from typing import Final, TYPE_CHECKING, Self, Type, cast
+
+from twisted.internet import defer
+from twisted.words.protocols.jabber import jid, error as jabber_error
+from twisted.words.protocols.jabber import xmlstream
+from twisted.words.protocols.jabber.xmlstream import XMPPHandler
+from twisted.words.xish import domish
+from wokkel import data_form, disco, iwokkel
+from zope.interface import implementer
+
+from libervia.backend.core import exceptions
+from libervia.backend.core.constants import Const as C
+from libervia.backend.core.core_types import SatXMPPEntity
+from libervia.backend.core.i18n import _
+from libervia.backend.core.log import getLogger
+from libervia.backend.plugins.plugin_xep_0106 import XEP_0106
+from libervia.backend.tools import xml_tools
+
+if TYPE_CHECKING:
+    from libervia.backend.core.main import LiberviaBackend
+
+log = getLogger(__name__)
+
+
+PLUGIN_INFO = {
+    C.PI_NAME: "Gateway Relayer Encryption",
+    C.PI_IMPORT_NAME: "GRE",
+    C.PI_TYPE: "XEP",
+    C.PI_MODES: C.PLUG_MODE_BOTH,
+    C.PI_PROTOCOLS: [],
+    C.PI_DEPENDENCIES: ["XEP-0106"],
+    C.PI_RECOMMENDATIONS: [],
+    C.PI_MAIN: "GRE",
+    C.PI_HANDLER: "yes",
+    C.PI_DESCRIPTION: _(
+        "Handle formatting and encryption to support end-to-end encryption with gateways."
+    ),
+}
+
+NS_GRE_PREFIX: Final = "urn:xmpp:gre:"
+NS_GRE: Final = f"{NS_GRE_PREFIX}0"
+NS_GRE_FORMATTER_PREFIX: Final = f"{NS_GRE_PREFIX}formatter:"
+NS_GRE_ENCRYPTER_PREFIX: Final = f"{NS_GRE_PREFIX}encrypter:"
+NS_GRE_DATA: Final = f"{NS_GRE_PREFIX}data"
+
+IQ_DATA_REQUEST = C.IQ_GET + '/data[@xmlns="' + NS_GRE + '"]'
+
+
+class Formatter(ABC):
+
+    formatters_classes: dict[str, Type[Self]] = {}
+    name: str = ""
+    namespace: str = ""
+    _instance: Self | None = None
+
+    def __init_subclass__(cls, **kwargs) -> None:
+        """
+        Registers the subclass in the formatters dictionary.
+
+        @param kwargs: Additional keyword arguments.
+        """
+        assert cls.name and cls.namespace, "name and namespace must be set"
+        super().__init_subclass__(**kwargs)
+        cls.formatters_classes[cls.namespace] = cls
+
+    def __init__(self, host: "LiberviaBackend") -> None:
+        assert self.__class__._instance is None, "Formatter class must be singleton."
+        self.__class__._instance = self
+        self.host = host
+
+    @classmethod
+    def get_instance(cls) -> Self:
+        if cls._instance is None:
+            raise exceptions.InternalError("Formatter instance should be set.")
+        return cls._instance
+
+    @abstractmethod
+    async def format(
+        self,
+        client: SatXMPPEntity,
+        recipient_id: str,
+        message_elt: domish.Element,
+        encryption_data_form: data_form.Form,
+    ) -> bytes:
+        raise NotImplementedError
+
+
+class Encrypter(ABC):
+
+    encrypters_classes: dict[str, Type[Self]] = {}
+    name: str = ""
+    namespace: str = ""
+    _instance: Self | None = None
+
+    def __init_subclass__(cls, **kwargs) -> None:
+        """
+        Registers the subclass in the encrypters dictionary.
+
+        @param kwargs: Additional keyword arguments.
+        """
+        assert cls.name and cls.namespace, "name and namespace must be set"
+        super().__init_subclass__(**kwargs)
+        cls.encrypters_classes[cls.namespace] = cls
+
+    def __init__(self, host: "LiberviaBackend") -> None:
+        assert self.__class__._instance is None, "Encrypter class must be singleton."
+        self.__class__._instance = self
+        self.host = host
+
+    @classmethod
+    def get_instance(cls) -> Self:
+        if cls._instance is None:
+            raise exceptions.InternalError("Encrypter instance should be set.")
+        return cls._instance
+
+    @abstractmethod
+    async def encrypt(
+        self,
+        client: SatXMPPEntity,
+        recipient_id: str,
+        message_elt: domish.Element,
+        formatted_payload: bytes,
+        encryption_data_form: data_form.Form,
+    ) -> str:
+        raise NotImplementedError
+
+
+class GetDataHandler(ABC):
+    gre_formatters: list[str] = []
+    gre_encrypters: list[str] = []
+
+    def __init_subclass__(cls, **kwargs):
+        super().__init_subclass__(**kwargs)
+        if not cls.gre_formatters or not cls.gre_encrypters:
+            raise TypeError(
+                f'{cls.__name__} must define "gre_formatters" and "gre_encrypters"'
+            )
+
+    @abstractmethod
+    async def on_relayed_encryption_data(
+        self, client: SatXMPPEntity, iq_elt: domish.Element, form: data_form.Form
+    ) -> None:
+        raise NotImplementedError
+
+
+class GRE:
+    namespace = NS_GRE
+
+    def __init__(self, host: "LiberviaBackend") -> None:
+        log.info(f"plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization")
+        self.host = host
+        self._e = cast(XEP_0106, host.plugins["XEP-0106"])
+        self._data_handlers: dict[SatXMPPEntity, GetDataHandler] = {}
+        host.register_namespace("gre", NS_GRE)
+        self.host.register_encryption_plugin(self, "Relayed", NS_GRE)
+        host.trigger.add("send", self.send_trigger, priority=0)
+
+    def register_get_data_handler(
+        self, client: SatXMPPEntity, handler: GetDataHandler
+    ) -> None:
+        if client in self._data_handlers:
+            raise exceptions.InternalError(
+                '"register_get_data_handler" should not be called twice for the same '
+                "handler."
+            )
+        self._data_handlers[client] = handler
+
+    def _on_component_data_request(
+        self, iq_elt: domish.Element, client: SatXMPPEntity
+    ) -> None:
+        iq_elt.handled = True
+        defer.ensureDeferred(self.on_component_data_request(client, iq_elt))
+
+    async def on_component_data_request(
+        self, client: SatXMPPEntity, iq_elt: domish.Element
+    ) -> None:
+        form = data_form.Form(
+            "result", "Relayed Data Encryption", formNamespace=NS_GRE_DATA
+        )
+        try:
+            handler = self._data_handlers[client]
+        except KeyError:
+            pass
+        else:
+            await handler.on_relayed_encryption_data(client, iq_elt, form)
+        iq_result_elt = xmlstream.toResponse(iq_elt, "result")
+        data_elt = iq_result_elt.addElement((NS_GRE, "data"))
+        data_elt.addChild(form.toElement())
+        client.send(iq_result_elt)
+
+    async def get_formatter_and_encrypter(
+        self, client: SatXMPPEntity, gateway_jid: jid.JID
+    ) -> tuple[Formatter, Encrypter]:
+        """Retrieve Formatter and Encrypter instances for given gateway.
+
+        @param client: client session.
+        @param gateway_jid: bare jid of the gateway.
+        @return: Formatter and Encrypter instances.
+        @raise exceptions.FeatureNotFound: No relevant Formatter or Encrypter could be
+            found.
+        """
+        disco_infos = await self.host.memory.disco.get_infos(client, gateway_jid)
+        try:
+            formatter_ns = next(
+                f for f in disco_infos.features if f.startswith(NS_GRE_FORMATTER_PREFIX)
+            )
+            encrypter_ns = next(
+                f for f in disco_infos.features if f.startswith(NS_GRE_ENCRYPTER_PREFIX)
+            )
+            formatter_cls = Formatter.formatters_classes[formatter_ns]
+            encrypter_cls = Encrypter.encrypters_classes[encrypter_ns]
+        except StopIteration as e:
+            raise exceptions.FeatureNotFound("No relayed encryption found.") from e
+        except KeyError as e:
+            raise exceptions.FeatureNotFound(
+                "No compatible relayed encryption found."
+            ) from e
+
+        return formatter_cls.get_instance(), encrypter_cls.get_instance()
+
+    def get_encrypted_payload(
+        self,
+        message_elt: domish.Element,
+    ) -> str | None:
+        """Return encrypted payload if any.
+
+        @param message_elt: The message element.
+        @return: Encrypted payload if any, None otherwise.
+        """
+        encrypted_elt = next(message_elt.elements(NS_GRE, "encrypted"), None)
+        if encrypted_elt is None:
+            return None
+        return str(encrypted_elt)
+
+    async def send_trigger(
+        self, client: SatXMPPEntity, stanza_elt: domish.Element
+    ) -> bool:
+        """
+        @param client: Profile session.
+        @param stanza: The stanza that is about to be sent.
+        @return: Whether the send message flow should continue or not.
+        """
+        if stanza_elt.name != "message":
+            return True
+
+        try:
+            recipient = jid.JID(stanza_elt["to"])
+        except (jabber_error.StanzaError, RuntimeError, jid.InvalidFormat) as e:
+            raise exceptions.InternalError(
+                "Message without recipient encountered. Blocking further processing to"
+                f" avoid leaking plaintext data: {stanza_elt.toXml()}"
+            ) from e
+
+        recipient_bare_jid = recipient.userhostJID()
+
+        encryption_session = client.encryption.getSession(recipient_bare_jid)
+        if encryption_session is None:
+            return True
+        if encryption_session["plugin"].namespace != NS_GRE:
+            return True
+
+        # We are in a relayed encryption session.
+
+        encryption_data_form = await self.get_data(client, recipient_bare_jid)
+
+        formatter, encrypter = await self.get_formatter_and_encrypter(
+            client, recipient_bare_jid
+        )
+
+        try:
+            recipient_id = self._e.unescape(recipient.user)
+        except ValueError as e:
+            raise exceptions.DataError('"to" attribute is not in expected fomat') from e
+
+        formatted_payload = await formatter.format(
+            client, recipient_id, stanza_elt, encryption_data_form
+        )
+        encrypted_payload = await encrypter.encrypt(
+            client, recipient_id, stanza_elt, formatted_payload, encryption_data_form
+        )
+
+        for body_elt in list(stanza_elt.elements(None, "body")):
+            stanza_elt.children.remove(body_elt)
+        for subject_elt in list(stanza_elt.elements(None, "subject")):
+            stanza_elt.children.remove(subject_elt)
+
+        encrypted_elt = stanza_elt.addElement(
+            (NS_GRE, "encrypted"), content=encrypted_payload
+        )
+        encrypted_elt["formatter"] = formatter.namespace
+        encrypted_elt["encrypter"] = encrypter.namespace
+
+        return True
+
+    async def get_data(
+        self, client: SatXMPPEntity, recipient_jid: jid.JID
+    ) -> data_form.Form:
+        """Retrieve relayed encryption data form.
+
+        @param client: Client session.
+        @param recipient_id: Bare jid of the entity to whom we want to send encrypted
+            mesasge.
+        @return: Found data form, or None if no data form has been found.
+        """
+        assert recipient_jid.resource is None, "recipient_jid must be a bare jid."
+        iq_elt = client.IQ("get")
+        iq_elt["to"] = recipient_jid.full()
+        data_elt = iq_elt.addElement((NS_GRE, "data"))
+        iq_result_elt = await iq_elt.send()
+        try:
+            data_elt = next(iq_result_elt.elements(NS_GRE, "data"))
+        except StopIteration:
+            raise exceptions.DataError(
+                f"Relayed data payload is missing: {iq_result_elt.toXml()}"
+            )
+        form = data_form.findForm(data_elt, NS_GRE_DATA)
+        if form is None:
+            raise exceptions.DataError(
+                f"Relayed data form is missing: {iq_result_elt.toXml()}"
+            )
+        return form
+
+    async def get_trust_ui(
+        self, client: SatXMPPEntity, entity: jid.JID
+    ) -> xml_tools.XMLUI:
+        """
+        @param client: The client session.
+        @param entity: The entity whose device trust levels to manage.
+        @return: An XMLUI Dialog to handle trust for given entity.
+        """
+        # We just return an enmpty form for now.
+        return xml_tools.XMLUI(C.XMLUI_FORM)
+
+    def get_handler(self, client: SatXMPPEntity) -> XMPPHandler:
+        return GREHandler(self)
+
+
+@implementer(iwokkel.IDisco)
+class GREHandler(XMPPHandler):
+
+    def __init__(self, plugin_parent: GRE) -> None:
+        self.plugin_parent = plugin_parent
+
+    def connectionInitialized(self):
+        assert self.parent is not None and self.xmlstream is not None
+        if self.parent.is_component:
+            self.xmlstream.addObserver(
+                IQ_DATA_REQUEST,
+                self.plugin_parent._on_component_data_request,
+                client=self.parent,
+            )
+
+    def getDiscoInfo(
+        self, requestor: jid.JID, target: jid.JID, nodeIdentifier: str = ""
+    ) -> list[disco.DiscoFeature]:
+        return [
+            disco.DiscoFeature(NS_GRE),
+        ]
+
+    def getDiscoItems(
+        self, requestor: jid.JID, target: jid.JID, nodeIdentifier: str = ""
+    ) -> list[disco.DiscoItems]:
+        return []