Mercurial > libervia-backend
view libervia/backend/plugins/plugin_exp_gre.py @ 4351:6a0a081485b8
plugin autocrypt: Autocrypt protocol implementation:
Implementation of autocrypt: `autocrypt` header is checked, and if present and no public
key is known for the peer, the key is imported.
`autocrypt` header is also added to outgoing message (only if an email gateway is
detected).
For the moment, the JID is use as identifier, but the real email used by gateway should be
used in the future.
rel 456
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 28 Feb 2025 09:23:35 +0100 |
parents | 95f8309f86cf |
children |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia plugin # Copyright (C) 2009-2025 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from abc import ABC, abstractmethod from typing import Final, TYPE_CHECKING, Self, Type, cast from twisted.internet import defer from twisted.words.protocols.jabber import jid, error as jabber_error from twisted.words.protocols.jabber import xmlstream from twisted.words.protocols.jabber.xmlstream import XMPPHandler from twisted.words.xish import domish from wokkel import data_form, disco, iwokkel from zope.interface import implementer from libervia.backend.core import exceptions from libervia.backend.core.constants import Const as C from libervia.backend.core.core_types import SatXMPPEntity from libervia.backend.core.i18n import _ from libervia.backend.core.log import getLogger from libervia.backend.plugins.plugin_xep_0106 import XEP_0106 from libervia.backend.tools import xml_tools if TYPE_CHECKING: from libervia.backend.core.main import LiberviaBackend log = getLogger(__name__) PLUGIN_INFO = { C.PI_NAME: "Gateway Relayer Encryption", C.PI_IMPORT_NAME: "GRE", C.PI_TYPE: "XEP", C.PI_MODES: C.PLUG_MODE_BOTH, C.PI_PROTOCOLS: [], C.PI_DEPENDENCIES: ["XEP-0106"], C.PI_RECOMMENDATIONS: [], C.PI_MAIN: "GRE", C.PI_HANDLER: "yes", C.PI_DESCRIPTION: _( "Handle formatting and encryption to support end-to-end encryption with gateways." ), } NS_GRE_PREFIX: Final = "urn:xmpp:gre:" NS_GRE: Final = f"{NS_GRE_PREFIX}0" NS_GRE_FORMATTER_PREFIX: Final = f"{NS_GRE_PREFIX}formatter:" NS_GRE_ENCRYPTER_PREFIX: Final = f"{NS_GRE_PREFIX}encrypter:" NS_GRE_DATA: Final = f"{NS_GRE_PREFIX}data" IQ_DATA_REQUEST = C.IQ_GET + '/data[@xmlns="' + NS_GRE + '"]' class Formatter(ABC): formatters_classes: dict[str, Type[Self]] = {} name: str = "" namespace: str = "" _instance: Self | None = None def __init_subclass__(cls, **kwargs) -> None: """ Registers the subclass in the formatters dictionary. @param kwargs: Additional keyword arguments. """ assert cls.name and cls.namespace, "name and namespace must be set" super().__init_subclass__(**kwargs) cls.formatters_classes[cls.namespace] = cls def __init__(self, host: "LiberviaBackend") -> None: assert self.__class__._instance is None, "Formatter class must be singleton." self.__class__._instance = self self.host = host @classmethod def get_instance(cls) -> Self: if cls._instance is None: raise exceptions.InternalError("Formatter instance should be set.") return cls._instance @abstractmethod async def format( self, client: SatXMPPEntity, recipient_id: str, message_elt: domish.Element, encryption_data_form: data_form.Form, ) -> bytes: raise NotImplementedError class Encrypter(ABC): encrypters_classes: dict[str, Type[Self]] = {} name: str = "" namespace: str = "" _instance: Self | None = None def __init_subclass__(cls, **kwargs) -> None: """ Registers the subclass in the encrypters dictionary. @param kwargs: Additional keyword arguments. """ assert cls.name and cls.namespace, "name and namespace must be set" super().__init_subclass__(**kwargs) cls.encrypters_classes[cls.namespace] = cls def __init__(self, host: "LiberviaBackend") -> None: assert self.__class__._instance is None, "Encrypter class must be singleton." self.__class__._instance = self self.host = host @classmethod def get_instance(cls) -> Self: if cls._instance is None: raise exceptions.InternalError("Encrypter instance should be set.") return cls._instance @abstractmethod async def encrypt( self, client: SatXMPPEntity, recipient_id: str, message_elt: domish.Element, formatted_payload: bytes, encryption_data_form: data_form.Form, ) -> str: raise NotImplementedError class GetDataHandler(ABC): gre_formatters: list[str] = [] gre_encrypters: list[str] = [] def __init_subclass__(cls, **kwargs): super().__init_subclass__(**kwargs) if not cls.gre_formatters or not cls.gre_encrypters: raise TypeError( f'{cls.__name__} must define "gre_formatters" and "gre_encrypters"' ) @abstractmethod async def on_relayed_encryption_data( self, client: SatXMPPEntity, iq_elt: domish.Element, form: data_form.Form ) -> None: raise NotImplementedError class GRE: namespace = NS_GRE def __init__(self, host: "LiberviaBackend") -> None: log.info(f"plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization") self.host = host self._e = cast(XEP_0106, host.plugins["XEP-0106"]) self._data_handlers: dict[SatXMPPEntity, GetDataHandler] = {} host.register_namespace("gre", NS_GRE) self.host.register_encryption_plugin(self, "Relayed", NS_GRE) host.trigger.add("send", self.send_trigger, priority=0) def register_get_data_handler( self, client: SatXMPPEntity, handler: GetDataHandler ) -> None: if client in self._data_handlers: raise exceptions.InternalError( '"register_get_data_handler" should not be called twice for the same ' "handler." ) self._data_handlers[client] = handler def _on_component_data_request( self, iq_elt: domish.Element, client: SatXMPPEntity ) -> None: iq_elt.handled = True defer.ensureDeferred(self.on_component_data_request(client, iq_elt)) async def on_component_data_request( self, client: SatXMPPEntity, iq_elt: domish.Element ) -> None: form = data_form.Form( "result", "Relayed Data Encryption", formNamespace=NS_GRE_DATA ) try: handler = self._data_handlers[client] except KeyError: pass else: await handler.on_relayed_encryption_data(client, iq_elt, form) iq_result_elt = xmlstream.toResponse(iq_elt, "result") data_elt = iq_result_elt.addElement((NS_GRE, "data")) data_elt.addChild(form.toElement()) client.send(iq_result_elt) async def get_formatter_and_encrypter( self, client: SatXMPPEntity, gateway_jid: jid.JID ) -> tuple[Formatter, Encrypter]: """Retrieve Formatter and Encrypter instances for given gateway. @param client: client session. @param gateway_jid: bare jid of the gateway. @return: Formatter and Encrypter instances. @raise exceptions.FeatureNotFound: No relevant Formatter or Encrypter could be found. """ disco_infos = await self.host.memory.disco.get_infos(client, gateway_jid) try: formatter_ns = next( f for f in disco_infos.features if f.startswith(NS_GRE_FORMATTER_PREFIX) ) encrypter_ns = next( f for f in disco_infos.features if f.startswith(NS_GRE_ENCRYPTER_PREFIX) ) formatter_cls = Formatter.formatters_classes[formatter_ns] encrypter_cls = Encrypter.encrypters_classes[encrypter_ns] except StopIteration as e: raise exceptions.FeatureNotFound("No relayed encryption found.") from e except KeyError as e: raise exceptions.FeatureNotFound( "No compatible relayed encryption found." ) from e return formatter_cls.get_instance(), encrypter_cls.get_instance() def get_encrypted_payload( self, message_elt: domish.Element, ) -> str | None: """Return encrypted payload if any. @param message_elt: The message element. @return: Encrypted payload if any, None otherwise. """ encrypted_elt = next(message_elt.elements(NS_GRE, "encrypted"), None) if encrypted_elt is None: return None return str(encrypted_elt) async def send_trigger( self, client: SatXMPPEntity, stanza_elt: domish.Element ) -> bool: """ @param client: Profile session. @param stanza: The stanza that is about to be sent. @return: Whether the send message flow should continue or not. """ if stanza_elt.name != "message": return True try: recipient = jid.JID(stanza_elt["to"]) except (jabber_error.StanzaError, RuntimeError, jid.InvalidFormat) as e: raise exceptions.InternalError( "Message without recipient encountered. Blocking further processing to" f" avoid leaking plaintext data: {stanza_elt.toXml()}" ) from e recipient_bare_jid = recipient.userhostJID() encryption_session = client.encryption.getSession(recipient_bare_jid) if encryption_session is None: return True if encryption_session["plugin"].namespace != NS_GRE: return True # We are in a relayed encryption session. encryption_data_form = await self.get_data(client, recipient_bare_jid) formatter, encrypter = await self.get_formatter_and_encrypter( client, recipient_bare_jid ) try: recipient_id = self._e.unescape(recipient.user) except ValueError as e: raise exceptions.DataError('"to" attribute is not in expected fomat') from e formatted_payload = await formatter.format( client, recipient_id, stanza_elt, encryption_data_form ) encrypted_payload = await encrypter.encrypt( client, recipient_id, stanza_elt, formatted_payload, encryption_data_form ) for body_elt in list(stanza_elt.elements(None, "body")): stanza_elt.children.remove(body_elt) for subject_elt in list(stanza_elt.elements(None, "subject")): stanza_elt.children.remove(subject_elt) encrypted_elt = stanza_elt.addElement( (NS_GRE, "encrypted"), content=encrypted_payload ) encrypted_elt["formatter"] = formatter.namespace encrypted_elt["encrypter"] = encrypter.namespace return True async def get_data( self, client: SatXMPPEntity, recipient_jid: jid.JID ) -> data_form.Form: """Retrieve relayed encryption data form. @param client: Client session. @param recipient_id: Bare jid of the entity to whom we want to send encrypted mesasge. @return: Found data form, or None if no data form has been found. """ assert recipient_jid.resource is None, "recipient_jid must be a bare jid." iq_elt = client.IQ("get") iq_elt["to"] = recipient_jid.full() data_elt = iq_elt.addElement((NS_GRE, "data")) iq_result_elt = await iq_elt.send() try: data_elt = next(iq_result_elt.elements(NS_GRE, "data")) except StopIteration: raise exceptions.DataError( f"Relayed data payload is missing: {iq_result_elt.toXml()}" ) form = data_form.findForm(data_elt, NS_GRE_DATA) if form is None: raise exceptions.DataError( f"Relayed data form is missing: {iq_result_elt.toXml()}" ) return form async def get_trust_ui( self, client: SatXMPPEntity, entity: jid.JID ) -> xml_tools.XMLUI: """ @param client: The client session. @param entity: The entity whose device trust levels to manage. @return: An XMLUI Dialog to handle trust for given entity. """ # We just return an enmpty form for now. return xml_tools.XMLUI(C.XMLUI_FORM) def get_handler(self, client: SatXMPPEntity) -> XMPPHandler: return GREHandler(self) @implementer(iwokkel.IDisco) class GREHandler(XMPPHandler): def __init__(self, plugin_parent: GRE) -> None: self.plugin_parent = plugin_parent def connectionInitialized(self): assert self.parent is not None and self.xmlstream is not None if self.parent.is_component: self.xmlstream.addObserver( IQ_DATA_REQUEST, self.plugin_parent._on_component_data_request, client=self.parent, ) def getDiscoInfo( self, requestor: jid.JID, target: jid.JID, nodeIdentifier: str = "" ) -> list[disco.DiscoFeature]: return [ disco.DiscoFeature(NS_GRE), ] def getDiscoItems( self, requestor: jid.JID, target: jid.JID, nodeIdentifier: str = "" ) -> list[disco.DiscoItems]: return []