# HG changeset patch # User Goffi # Date 1736727802 -3600 # Node ID 07e87adb2f65552f864f890e94183d8ce18a41ee # Parent 95f8309f86cf844784851af28eb912672728834a plugin GRE formatter MIME: implements GRE Formatter: MIME: rel 455 diff -r 95f8309f86cf -r 07e87adb2f65 libervia/backend/plugins/plugin_sec_gre_formatter_mime.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/libervia/backend/plugins/plugin_sec_gre_formatter_mime.py Mon Jan 13 01:23:22 2025 +0100 @@ -0,0 +1,127 @@ +#!/usr/bin/env python3 + +# Libervia plugin +# Copyright (C) 2009-2025 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +from typing import Final, TYPE_CHECKING, cast + +from twisted.words.protocols.jabber import jid +from twisted.words.protocols.jabber.xmlstream import XMPPHandler +from twisted.words.xish import domish +from wokkel import data_form, disco, iwokkel +from zope.interface import implementer + +from libervia.backend.core import exceptions +from libervia.backend.core.constants import Const as C +from libervia.backend.core.core_types import SatXMPPEntity +from libervia.backend.core.i18n import _ +from libervia.backend.core.log import getLogger +from libervia.backend.plugins.plugin_xep_0106 import XEP_0106 +from .plugin_exp_gre import Formatter + +from email.mime.multipart import MIMEMultipart +from email.mime.text import MIMEText + +if TYPE_CHECKING: + from libervia.backend.core.main import LiberviaBackend + +log = getLogger(__name__) + + +PLUGIN_INFO = { + C.PI_NAME: "GRE Formatter: MIME", + C.PI_IMPORT_NAME: "GRE-MIME", + C.PI_TYPE: "XEP", + C.PI_MODES: C.PLUG_MODE_BOTH, + C.PI_PROTOCOLS: [], + C.PI_DEPENDENCIES: [ + "GRE", + ], + C.PI_RECOMMENDATIONS: [], + C.PI_MAIN: "GREFormatterMime", + C.PI_HANDLER: "yes", + C.PI_DESCRIPTION: _("Handle MIME formatting for Gateway Relayed Encryption."), +} + +NS_GRE_MIME: Final = "urn:xmpp:gre:formatter:mime:0" + + +class GREFormatterMime(Formatter): + name = "mime" + namespace = NS_GRE_MIME + + def __init__(self, host: "LiberviaBackend") -> None: + log.info(f"plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization") + super().__init__(host) + host.register_namespace("gre-mime", NS_GRE_MIME) + + def get_handler(self, client: SatXMPPEntity) -> XMPPHandler: + return GREMIMEHandler(self) + + async def format( + self, + client: SatXMPPEntity, + recipient_id: str, + message_elt: domish.Element, + encryption_data_form: data_form.Form, + ) -> bytes: + """Format the sent stanza as multipart MIME payload. + + @param client: Client session. + @param message_elt: element being sent. + @return: MIME formatted payload. + """ + if message_elt.name != "message": + raise exceptions.InternalError("Only stanza should be received.") + + try: + body = str(next(message_elt.elements(None, "body"))) + except StopIteration as e: + msg = f"Cancelling message sending due to missing body: {message_elt.toXml()}" + log.warning(msg) + raise exceptions.CancelError(msg) from e + + msg = MIMEMultipart() + # "sender_id" is the real email address used by the gateway for this user. + msg["From"] = encryption_data_form["sender_id"] + msg["To"] = recipient_id + + subject_elt = next(message_elt.elements(None, "subject"), None) + if subject_elt: + msg["Subject"] = str(subject_elt) + + msg.attach(MIMEText(body)) + + return msg.as_bytes() + + +@implementer(iwokkel.IDisco) +class GREMIMEHandler(XMPPHandler): + + def __init__(self, plugin_parent): + self.plugin_parent = plugin_parent + + def getDiscoInfo( + self, requestor: jid.JID, target: jid.JID, nodeIdentifier: str = "" + ) -> list[disco.DiscoFeature]: + return [ + disco.DiscoFeature(NS_GRE_MIME), + ] + + def getDiscoItems( + self, requestor: jid.JID, target: jid.JID, nodeIdentifier: str = "" + ) -> list[disco.DiscoItems]: + return []