comparison libervia/backend/plugins/plugin_sec_gre_encrypter_openpgp.py @ 4346:62746042e6d9

plugin gre encrypter: implement GRE Encrypter: OpenPGP: rel 455
author Goffi <goffi@goffi.org>
date Mon, 13 Jan 2025 01:23:22 +0100
parents
children
comparison
equal deleted inserted replaced
4345:07e87adb2f65 4346:62746042e6d9
1 #!/usr/bin/env python3
2
3 # Libervia plugin
4 # Copyright (C) 2009-2025 Jérôme Poisson (goffi@goffi.org)
5
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU Affero General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
10
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU Affero General Public License for more details.
15
16 # You should have received a copy of the GNU Affero General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19 import base64
20 from typing import Final, TYPE_CHECKING, cast
21
22 from twisted.words.protocols.jabber import jid
23 from twisted.words.protocols.jabber.xmlstream import XMPPHandler
24 from twisted.words.xish import domish
25 from wokkel import data_form, disco, iwokkel
26 from zope.interface import implementer
27
28 from libervia.backend.core import exceptions
29 from libervia.backend.core.constants import Const as C
30 from libervia.backend.core.core_types import SatXMPPEntity
31 from libervia.backend.core.i18n import _
32 from libervia.backend.core.log import getLogger
33 from libervia.backend.plugins import plugin_xep_0373
34 from .plugin_exp_gre import Encrypter
35
36 if TYPE_CHECKING:
37 from libervia.backend.core.main import LiberviaBackend
38
39 log = getLogger(__name__)
40
41
42 PLUGIN_INFO = {
43 C.PI_NAME: "GRE Encrypter: OpenPGP",
44 C.PI_IMPORT_NAME: "GRE-OpenPGP",
45 C.PI_TYPE: "XEP",
46 C.PI_MODES: C.PLUG_MODE_BOTH,
47 C.PI_PROTOCOLS: [],
48 C.PI_DEPENDENCIES: [
49 "GRE",
50 ],
51 C.PI_RECOMMENDATIONS: [],
52 C.PI_MAIN: "GREEncrypterOpenPGP",
53 C.PI_HANDLER: "yes",
54 C.PI_DESCRIPTION: _("Handle MIME formatting for Gateway Relayed Encryption."),
55 }
56
57 NS_GRE_OPENPGP: Final = "urn:xmpp:gre:encrypter:openpgp:0"
58
59
60 class GREEncrypterOpenPGP(Encrypter):
61 name = "openpgp"
62 namespace = NS_GRE_OPENPGP
63
64 def __init__(self, host: "LiberviaBackend") -> None:
65 log.info(f"plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization")
66 super().__init__(host)
67 host.register_namespace("gre-openpgp", NS_GRE_OPENPGP)
68
69 def get_handler(self, client: SatXMPPEntity) -> XMPPHandler:
70 return GREMIMEHandler(self)
71
72 async def encrypt(
73 self,
74 client: SatXMPPEntity,
75 recipient_id: str,
76 message_elt: domish.Element,
77 formatted_payload: bytes,
78 encryption_data_form: data_form.Form,
79 ) -> str:
80 gpg_provider = plugin_xep_0373.get_gpg_provider(self.host, client)
81 public_keys = gpg_provider.list_public_keys(recipient_id)
82 if not public_keys:
83 raise exceptions.NotFound(
84 f"No public keys found for {recipient_id!r}, we can't encrypt."
85 )
86 encrypted_data = gpg_provider.encrypt(
87 formatted_payload, public_keys
88 )
89 return base64.b64encode(encrypted_data).decode("ASCII")
90
91
92 @implementer(iwokkel.IDisco)
93 class GREMIMEHandler(XMPPHandler):
94
95 def __init__(self, plugin_parent):
96 self.plugin_parent = plugin_parent
97
98 def getDiscoInfo(
99 self, requestor: jid.JID, target: jid.JID, nodeIdentifier: str = ""
100 ) -> list[disco.DiscoFeature]:
101 return [
102 disco.DiscoFeature(NS_GRE_OPENPGP),
103 ]
104
105 def getDiscoItems(
106 self, requestor: jid.JID, target: jid.JID, nodeIdentifier: str = ""
107 ) -> list[disco.DiscoItems]:
108 return []