comparison libervia/backend/plugins/plugin_sec_gre_formatter_mime.py @ 4345:07e87adb2f65

plugin GRE formatter MIME: implements GRE Formatter: MIME: rel 455
author Goffi <goffi@goffi.org>
date Mon, 13 Jan 2025 01:23:22 +0100
parents
children
comparison
equal deleted inserted replaced
4344:95f8309f86cf 4345:07e87adb2f65
1 #!/usr/bin/env python3
2
3 # Libervia plugin
4 # Copyright (C) 2009-2025 Jérôme Poisson (goffi@goffi.org)
5
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU Affero General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
10
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU Affero General Public License for more details.
15
16 # You should have received a copy of the GNU Affero General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19 from typing import Final, TYPE_CHECKING, cast
20
21 from twisted.words.protocols.jabber import jid
22 from twisted.words.protocols.jabber.xmlstream import XMPPHandler
23 from twisted.words.xish import domish
24 from wokkel import data_form, disco, iwokkel
25 from zope.interface import implementer
26
27 from libervia.backend.core import exceptions
28 from libervia.backend.core.constants import Const as C
29 from libervia.backend.core.core_types import SatXMPPEntity
30 from libervia.backend.core.i18n import _
31 from libervia.backend.core.log import getLogger
32 from libervia.backend.plugins.plugin_xep_0106 import XEP_0106
33 from .plugin_exp_gre import Formatter
34
35 from email.mime.multipart import MIMEMultipart
36 from email.mime.text import MIMEText
37
38 if TYPE_CHECKING:
39 from libervia.backend.core.main import LiberviaBackend
40
41 log = getLogger(__name__)
42
43
44 PLUGIN_INFO = {
45 C.PI_NAME: "GRE Formatter: MIME",
46 C.PI_IMPORT_NAME: "GRE-MIME",
47 C.PI_TYPE: "XEP",
48 C.PI_MODES: C.PLUG_MODE_BOTH,
49 C.PI_PROTOCOLS: [],
50 C.PI_DEPENDENCIES: [
51 "GRE",
52 ],
53 C.PI_RECOMMENDATIONS: [],
54 C.PI_MAIN: "GREFormatterMime",
55 C.PI_HANDLER: "yes",
56 C.PI_DESCRIPTION: _("Handle MIME formatting for Gateway Relayed Encryption."),
57 }
58
59 NS_GRE_MIME: Final = "urn:xmpp:gre:formatter:mime:0"
60
61
62 class GREFormatterMime(Formatter):
63 name = "mime"
64 namespace = NS_GRE_MIME
65
66 def __init__(self, host: "LiberviaBackend") -> None:
67 log.info(f"plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization")
68 super().__init__(host)
69 host.register_namespace("gre-mime", NS_GRE_MIME)
70
71 def get_handler(self, client: SatXMPPEntity) -> XMPPHandler:
72 return GREMIMEHandler(self)
73
74 async def format(
75 self,
76 client: SatXMPPEntity,
77 recipient_id: str,
78 message_elt: domish.Element,
79 encryption_data_form: data_form.Form,
80 ) -> bytes:
81 """Format the sent stanza as multipart MIME payload.
82
83 @param client: Client session.
84 @param message_elt: <message> element being sent.
85 @return: MIME formatted payload.
86 """
87 if message_elt.name != "message":
88 raise exceptions.InternalError("Only <message> stanza should be received.")
89
90 try:
91 body = str(next(message_elt.elements(None, "body")))
92 except StopIteration as e:
93 msg = f"Cancelling message sending due to missing body: {message_elt.toXml()}"
94 log.warning(msg)
95 raise exceptions.CancelError(msg) from e
96
97 msg = MIMEMultipart()
98 # "sender_id" is the real email address used by the gateway for this user.
99 msg["From"] = encryption_data_form["sender_id"]
100 msg["To"] = recipient_id
101
102 subject_elt = next(message_elt.elements(None, "subject"), None)
103 if subject_elt:
104 msg["Subject"] = str(subject_elt)
105
106 msg.attach(MIMEText(body))
107
108 return msg.as_bytes()
109
110
111 @implementer(iwokkel.IDisco)
112 class GREMIMEHandler(XMPPHandler):
113
114 def __init__(self, plugin_parent):
115 self.plugin_parent = plugin_parent
116
117 def getDiscoInfo(
118 self, requestor: jid.JID, target: jid.JID, nodeIdentifier: str = ""
119 ) -> list[disco.DiscoFeature]:
120 return [
121 disco.DiscoFeature(NS_GRE_MIME),
122 ]
123
124 def getDiscoItems(
125 self, requestor: jid.JID, target: jid.JID, nodeIdentifier: str = ""
126 ) -> list[disco.DiscoItems]:
127 return []