Mercurial > libervia-backend
view libervia/backend/plugins/plugin_sec_gre_formatter_mime.py @ 4348:35d41de5b2aa default tip @
doc (component): document use of Gateway Relayed Encryption:
fix 455
author | Goffi <goffi@goffi.org> |
---|---|
date | Mon, 13 Jan 2025 01:23:22 +0100 |
parents | 07e87adb2f65 |
children |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia plugin # Copyright (C) 2009-2025 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from typing import Final, TYPE_CHECKING, cast from twisted.words.protocols.jabber import jid from twisted.words.protocols.jabber.xmlstream import XMPPHandler from twisted.words.xish import domish from wokkel import data_form, disco, iwokkel from zope.interface import implementer from libervia.backend.core import exceptions from libervia.backend.core.constants import Const as C from libervia.backend.core.core_types import SatXMPPEntity from libervia.backend.core.i18n import _ from libervia.backend.core.log import getLogger from libervia.backend.plugins.plugin_xep_0106 import XEP_0106 from .plugin_exp_gre import Formatter from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText if TYPE_CHECKING: from libervia.backend.core.main import LiberviaBackend log = getLogger(__name__) PLUGIN_INFO = { C.PI_NAME: "GRE Formatter: MIME", C.PI_IMPORT_NAME: "GRE-MIME", C.PI_TYPE: "XEP", C.PI_MODES: C.PLUG_MODE_BOTH, C.PI_PROTOCOLS: [], C.PI_DEPENDENCIES: [ "GRE", ], C.PI_RECOMMENDATIONS: [], C.PI_MAIN: "GREFormatterMime", C.PI_HANDLER: "yes", C.PI_DESCRIPTION: _("Handle MIME formatting for Gateway Relayed Encryption."), } NS_GRE_MIME: Final = "urn:xmpp:gre:formatter:mime:0" class GREFormatterMime(Formatter): name = "mime" namespace = NS_GRE_MIME def __init__(self, host: "LiberviaBackend") -> None: log.info(f"plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization") super().__init__(host) host.register_namespace("gre-mime", NS_GRE_MIME) def get_handler(self, client: SatXMPPEntity) -> XMPPHandler: return GREMIMEHandler(self) async def format( self, client: SatXMPPEntity, recipient_id: str, message_elt: domish.Element, encryption_data_form: data_form.Form, ) -> bytes: """Format the sent stanza as multipart MIME payload. @param client: Client session. @param message_elt: <message> element being sent. @return: MIME formatted payload. """ if message_elt.name != "message": raise exceptions.InternalError("Only <message> stanza should be received.") try: body = str(next(message_elt.elements(None, "body"))) except StopIteration as e: msg = f"Cancelling message sending due to missing body: {message_elt.toXml()}" log.warning(msg) raise exceptions.CancelError(msg) from e msg = MIMEMultipart() # "sender_id" is the real email address used by the gateway for this user. msg["From"] = encryption_data_form["sender_id"] msg["To"] = recipient_id subject_elt = next(message_elt.elements(None, "subject"), None) if subject_elt: msg["Subject"] = str(subject_elt) msg.attach(MIMEText(body)) return msg.as_bytes() @implementer(iwokkel.IDisco) class GREMIMEHandler(XMPPHandler): def __init__(self, plugin_parent): self.plugin_parent = plugin_parent def getDiscoInfo( self, requestor: jid.JID, target: jid.JID, nodeIdentifier: str = "" ) -> list[disco.DiscoFeature]: return [ disco.DiscoFeature(NS_GRE_MIME), ] def getDiscoItems( self, requestor: jid.JID, target: jid.JID, nodeIdentifier: str = "" ) -> list[disco.DiscoItems]: return []