Mercurial > libervia-backend
view libervia/backend/plugins/plugin_exp_gre.py @ 4344:95f8309f86cf
plugin GRE: implements Gateway Relayed Encryption:
rel 455
author | Goffi <goffi@goffi.org> |
---|---|
date | Mon, 13 Jan 2025 01:23:22 +0100 |
parents | |
children |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia plugin # Copyright (C) 2009-2025 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from abc import ABC, abstractmethod from typing import Final, TYPE_CHECKING, Self, Type, cast from twisted.internet import defer from twisted.words.protocols.jabber import jid, error as jabber_error from twisted.words.protocols.jabber import xmlstream from twisted.words.protocols.jabber.xmlstream import XMPPHandler from twisted.words.xish import domish from wokkel import data_form, disco, iwokkel from zope.interface import implementer from libervia.backend.core import exceptions from libervia.backend.core.constants import Const as C from libervia.backend.core.core_types import SatXMPPEntity from libervia.backend.core.i18n import _ from libervia.backend.core.log import getLogger from libervia.backend.plugins.plugin_xep_0106 import XEP_0106 from libervia.backend.tools import xml_tools if TYPE_CHECKING: from libervia.backend.core.main import LiberviaBackend log = getLogger(__name__) PLUGIN_INFO = { C.PI_NAME: "Gateway Relayer Encryption", C.PI_IMPORT_NAME: "GRE", C.PI_TYPE: "XEP", C.PI_MODES: C.PLUG_MODE_BOTH, C.PI_PROTOCOLS: [], C.PI_DEPENDENCIES: ["XEP-0106"], C.PI_RECOMMENDATIONS: [], C.PI_MAIN: "GRE", C.PI_HANDLER: "yes", C.PI_DESCRIPTION: _( "Handle formatting and encryption to support end-to-end encryption with gateways." ), } NS_GRE_PREFIX: Final = "urn:xmpp:gre:" NS_GRE: Final = f"{NS_GRE_PREFIX}0" NS_GRE_FORMATTER_PREFIX: Final = f"{NS_GRE_PREFIX}formatter:" NS_GRE_ENCRYPTER_PREFIX: Final = f"{NS_GRE_PREFIX}encrypter:" NS_GRE_DATA: Final = f"{NS_GRE_PREFIX}data" IQ_DATA_REQUEST = C.IQ_GET + '/data[@xmlns="' + NS_GRE + '"]' class Formatter(ABC): formatters_classes: dict[str, Type[Self]] = {} name: str = "" namespace: str = "" _instance: Self | None = None def __init_subclass__(cls, **kwargs) -> None: """ Registers the subclass in the formatters dictionary. @param kwargs: Additional keyword arguments. """ assert cls.name and cls.namespace, "name and namespace must be set" super().__init_subclass__(**kwargs) cls.formatters_classes[cls.namespace] = cls def __init__(self, host: "LiberviaBackend") -> None: assert self.__class__._instance is None, "Formatter class must be singleton." self.__class__._instance = self self.host = host @classmethod def get_instance(cls) -> Self: if cls._instance is None: raise exceptions.InternalError("Formatter instance should be set.") return cls._instance @abstractmethod async def format( self, client: SatXMPPEntity, recipient_id: str, message_elt: domish.Element, encryption_data_form: data_form.Form, ) -> bytes: raise NotImplementedError class Encrypter(ABC): encrypters_classes: dict[str, Type[Self]] = {} name: str = "" namespace: str = "" _instance: Self | None = None def __init_subclass__(cls, **kwargs) -> None: """ Registers the subclass in the encrypters dictionary. @param kwargs: Additional keyword arguments. """ assert cls.name and cls.namespace, "name and namespace must be set" super().__init_subclass__(**kwargs) cls.encrypters_classes[cls.namespace] = cls def __init__(self, host: "LiberviaBackend") -> None: assert self.__class__._instance is None, "Encrypter class must be singleton." self.__class__._instance = self self.host = host @classmethod def get_instance(cls) -> Self: if cls._instance is None: raise exceptions.InternalError("Encrypter instance should be set.") return cls._instance @abstractmethod async def encrypt( self, client: SatXMPPEntity, recipient_id: str, message_elt: domish.Element, formatted_payload: bytes, encryption_data_form: data_form.Form, ) -> str: raise NotImplementedError class GetDataHandler(ABC): gre_formatters: list[str] = [] gre_encrypters: list[str] = [] def __init_subclass__(cls, **kwargs): super().__init_subclass__(**kwargs) if not cls.gre_formatters or not cls.gre_encrypters: raise TypeError( f'{cls.__name__} must define "gre_formatters" and "gre_encrypters"' ) @abstractmethod async def on_relayed_encryption_data( self, client: SatXMPPEntity, iq_elt: domish.Element, form: data_form.Form ) -> None: raise NotImplementedError class GRE: namespace = NS_GRE def __init__(self, host: "LiberviaBackend") -> None: log.info(f"plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization") self.host = host self._e = cast(XEP_0106, host.plugins["XEP-0106"]) self._data_handlers: dict[SatXMPPEntity, GetDataHandler] = {} host.register_namespace("gre", NS_GRE) self.host.register_encryption_plugin(self, "Relayed", NS_GRE) host.trigger.add("send", self.send_trigger, priority=0) def register_get_data_handler( self, client: SatXMPPEntity, handler: GetDataHandler ) -> None: if client in self._data_handlers: raise exceptions.InternalError( '"register_get_data_handler" should not be called twice for the same ' "handler." ) self._data_handlers[client] = handler def _on_component_data_request( self, iq_elt: domish.Element, client: SatXMPPEntity ) -> None: iq_elt.handled = True defer.ensureDeferred(self.on_component_data_request(client, iq_elt)) async def on_component_data_request( self, client: SatXMPPEntity, iq_elt: domish.Element ) -> None: form = data_form.Form( "result", "Relayed Data Encryption", formNamespace=NS_GRE_DATA ) try: handler = self._data_handlers[client] except KeyError: pass else: await handler.on_relayed_encryption_data(client, iq_elt, form) iq_result_elt = xmlstream.toResponse(iq_elt, "result") data_elt = iq_result_elt.addElement((NS_GRE, "data")) data_elt.addChild(form.toElement()) client.send(iq_result_elt) async def get_formatter_and_encrypter( self, client: SatXMPPEntity, gateway_jid: jid.JID ) -> tuple[Formatter, Encrypter]: """Retrieve Formatter and Encrypter instances for given gateway. @param client: client session. @param gateway_jid: bare jid of the gateway. @return: Formatter and Encrypter instances. @raise exceptions.FeatureNotFound: No relevant Formatter or Encrypter could be found. """ disco_infos = await self.host.memory.disco.get_infos(client, gateway_jid) try: formatter_ns = next( f for f in disco_infos.features if f.startswith(NS_GRE_FORMATTER_PREFIX) ) encrypter_ns = next( f for f in disco_infos.features if f.startswith(NS_GRE_ENCRYPTER_PREFIX) ) formatter_cls = Formatter.formatters_classes[formatter_ns] encrypter_cls = Encrypter.encrypters_classes[encrypter_ns] except StopIteration as e: raise exceptions.FeatureNotFound("No relayed encryption found.") from e except KeyError as e: raise exceptions.FeatureNotFound( "No compatible relayed encryption found." ) from e return formatter_cls.get_instance(), encrypter_cls.get_instance() def get_encrypted_payload( self, message_elt: domish.Element, ) -> str | None: """Return encrypted payload if any. @param message_elt: The message element. @return: Encrypted payload if any, None otherwise. """ encrypted_elt = next(message_elt.elements(NS_GRE, "encrypted"), None) if encrypted_elt is None: return None return str(encrypted_elt) async def send_trigger( self, client: SatXMPPEntity, stanza_elt: domish.Element ) -> bool: """ @param client: Profile session. @param stanza: The stanza that is about to be sent. @return: Whether the send message flow should continue or not. """ if stanza_elt.name != "message": return True try: recipient = jid.JID(stanza_elt["to"]) except (jabber_error.StanzaError, RuntimeError, jid.InvalidFormat) as e: raise exceptions.InternalError( "Message without recipient encountered. Blocking further processing to" f" avoid leaking plaintext data: {stanza_elt.toXml()}" ) from e recipient_bare_jid = recipient.userhostJID() encryption_session = client.encryption.getSession(recipient_bare_jid) if encryption_session is None: return True if encryption_session["plugin"].namespace != NS_GRE: return True # We are in a relayed encryption session. encryption_data_form = await self.get_data(client, recipient_bare_jid) formatter, encrypter = await self.get_formatter_and_encrypter( client, recipient_bare_jid ) try: recipient_id = self._e.unescape(recipient.user) except ValueError as e: raise exceptions.DataError('"to" attribute is not in expected fomat') from e formatted_payload = await formatter.format( client, recipient_id, stanza_elt, encryption_data_form ) encrypted_payload = await encrypter.encrypt( client, recipient_id, stanza_elt, formatted_payload, encryption_data_form ) for body_elt in list(stanza_elt.elements(None, "body")): stanza_elt.children.remove(body_elt) for subject_elt in list(stanza_elt.elements(None, "subject")): stanza_elt.children.remove(subject_elt) encrypted_elt = stanza_elt.addElement( (NS_GRE, "encrypted"), content=encrypted_payload ) encrypted_elt["formatter"] = formatter.namespace encrypted_elt["encrypter"] = encrypter.namespace return True async def get_data( self, client: SatXMPPEntity, recipient_jid: jid.JID ) -> data_form.Form: """Retrieve relayed encryption data form. @param client: Client session. @param recipient_id: Bare jid of the entity to whom we want to send encrypted mesasge. @return: Found data form, or None if no data form has been found. """ assert recipient_jid.resource is None, "recipient_jid must be a bare jid." iq_elt = client.IQ("get") iq_elt["to"] = recipient_jid.full() data_elt = iq_elt.addElement((NS_GRE, "data")) iq_result_elt = await iq_elt.send() try: data_elt = next(iq_result_elt.elements(NS_GRE, "data")) except StopIteration: raise exceptions.DataError( f"Relayed data payload is missing: {iq_result_elt.toXml()}" ) form = data_form.findForm(data_elt, NS_GRE_DATA) if form is None: raise exceptions.DataError( f"Relayed data form is missing: {iq_result_elt.toXml()}" ) return form async def get_trust_ui( self, client: SatXMPPEntity, entity: jid.JID ) -> xml_tools.XMLUI: """ @param client: The client session. @param entity: The entity whose device trust levels to manage. @return: An XMLUI Dialog to handle trust for given entity. """ # We just return an enmpty form for now. return xml_tools.XMLUI(C.XMLUI_FORM) def get_handler(self, client: SatXMPPEntity) -> XMPPHandler: return GREHandler(self) @implementer(iwokkel.IDisco) class GREHandler(XMPPHandler): def __init__(self, plugin_parent: GRE) -> None: self.plugin_parent = plugin_parent def connectionInitialized(self): assert self.parent is not None and self.xmlstream is not None if self.parent.is_component: self.xmlstream.addObserver( IQ_DATA_REQUEST, self.plugin_parent._on_component_data_request, client=self.parent, ) def getDiscoInfo( self, requestor: jid.JID, target: jid.JID, nodeIdentifier: str = "" ) -> list[disco.DiscoFeature]: return [ disco.DiscoFeature(NS_GRE), ] def getDiscoItems( self, requestor: jid.JID, target: jid.JID, nodeIdentifier: str = "" ) -> list[disco.DiscoItems]: return []