view libervia/backend/plugins/plugin_sec_gre_formatter_mime.py @ 4348:35d41de5b2aa default tip @

doc (component): document use of Gateway Relayed Encryption: fix 455
author Goffi <goffi@goffi.org>
date Mon, 13 Jan 2025 01:23:22 +0100
parents 07e87adb2f65
children
line wrap: on
line source

#!/usr/bin/env python3

# Libervia plugin
# Copyright (C) 2009-2025 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from typing import Final, TYPE_CHECKING, cast

from twisted.words.protocols.jabber import jid
from twisted.words.protocols.jabber.xmlstream import XMPPHandler
from twisted.words.xish import domish
from wokkel import data_form, disco, iwokkel
from zope.interface import implementer

from libervia.backend.core import exceptions
from libervia.backend.core.constants import Const as C
from libervia.backend.core.core_types import SatXMPPEntity
from libervia.backend.core.i18n import _
from libervia.backend.core.log import getLogger
from libervia.backend.plugins.plugin_xep_0106 import XEP_0106
from .plugin_exp_gre import Formatter

from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText

if TYPE_CHECKING:
    from libervia.backend.core.main import LiberviaBackend

log = getLogger(__name__)


PLUGIN_INFO = {
    C.PI_NAME: "GRE Formatter: MIME",
    C.PI_IMPORT_NAME: "GRE-MIME",
    C.PI_TYPE: "XEP",
    C.PI_MODES: C.PLUG_MODE_BOTH,
    C.PI_PROTOCOLS: [],
    C.PI_DEPENDENCIES: [
        "GRE",
    ],
    C.PI_RECOMMENDATIONS: [],
    C.PI_MAIN: "GREFormatterMime",
    C.PI_HANDLER: "yes",
    C.PI_DESCRIPTION: _("Handle MIME formatting for Gateway Relayed Encryption."),
}

NS_GRE_MIME: Final = "urn:xmpp:gre:formatter:mime:0"


class GREFormatterMime(Formatter):
    name = "mime"
    namespace = NS_GRE_MIME

    def __init__(self, host: "LiberviaBackend") -> None:
        log.info(f"plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization")
        super().__init__(host)
        host.register_namespace("gre-mime", NS_GRE_MIME)

    def get_handler(self, client: SatXMPPEntity) -> XMPPHandler:
        return GREMIMEHandler(self)

    async def format(
        self,
        client: SatXMPPEntity,
        recipient_id: str,
        message_elt: domish.Element,
        encryption_data_form: data_form.Form,
    ) -> bytes:
        """Format the sent stanza as multipart MIME payload.

        @param client: Client session.
        @param message_elt: <message> element being sent.
        @return: MIME formatted payload.
        """
        if message_elt.name != "message":
            raise exceptions.InternalError("Only <message> stanza should be received.")

        try:
            body = str(next(message_elt.elements(None, "body")))
        except StopIteration as e:
            msg = f"Cancelling message sending due to missing body: {message_elt.toXml()}"
            log.warning(msg)
            raise exceptions.CancelError(msg) from e

        msg = MIMEMultipart()
        # "sender_id" is the real email address used by the gateway for this user.
        msg["From"] = encryption_data_form["sender_id"]
        msg["To"] = recipient_id

        subject_elt = next(message_elt.elements(None, "subject"), None)
        if subject_elt:
            msg["Subject"] = str(subject_elt)

        msg.attach(MIMEText(body))

        return msg.as_bytes()


@implementer(iwokkel.IDisco)
class GREMIMEHandler(XMPPHandler):

    def __init__(self, plugin_parent):
        self.plugin_parent = plugin_parent

    def getDiscoInfo(
        self, requestor: jid.JID, target: jid.JID, nodeIdentifier: str = ""
    ) -> list[disco.DiscoFeature]:
        return [
            disco.DiscoFeature(NS_GRE_MIME),
        ]

    def getDiscoItems(
        self, requestor: jid.JID, target: jid.JID, nodeIdentifier: str = ""
    ) -> list[disco.DiscoItems]:
        return []