Mercurial > libervia-backend
comparison libervia/backend/plugins/plugin_sec_gre_encrypter_openpgp.py @ 4346:62746042e6d9
plugin gre encrypter: implement GRE Encrypter: OpenPGP:
rel 455
author | Goffi <goffi@goffi.org> |
---|---|
date | Mon, 13 Jan 2025 01:23:22 +0100 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
4345:07e87adb2f65 | 4346:62746042e6d9 |
---|---|
1 #!/usr/bin/env python3 | |
2 | |
3 # Libervia plugin | |
4 # Copyright (C) 2009-2025 Jérôme Poisson (goffi@goffi.org) | |
5 | |
6 # This program is free software: you can redistribute it and/or modify | |
7 # it under the terms of the GNU Affero General Public License as published by | |
8 # the Free Software Foundation, either version 3 of the License, or | |
9 # (at your option) any later version. | |
10 | |
11 # This program is distributed in the hope that it will be useful, | |
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
14 # GNU Affero General Public License for more details. | |
15 | |
16 # You should have received a copy of the GNU Affero General Public License | |
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
18 | |
19 import base64 | |
20 from typing import Final, TYPE_CHECKING, cast | |
21 | |
22 from twisted.words.protocols.jabber import jid | |
23 from twisted.words.protocols.jabber.xmlstream import XMPPHandler | |
24 from twisted.words.xish import domish | |
25 from wokkel import data_form, disco, iwokkel | |
26 from zope.interface import implementer | |
27 | |
28 from libervia.backend.core import exceptions | |
29 from libervia.backend.core.constants import Const as C | |
30 from libervia.backend.core.core_types import SatXMPPEntity | |
31 from libervia.backend.core.i18n import _ | |
32 from libervia.backend.core.log import getLogger | |
33 from libervia.backend.plugins import plugin_xep_0373 | |
34 from .plugin_exp_gre import Encrypter | |
35 | |
36 if TYPE_CHECKING: | |
37 from libervia.backend.core.main import LiberviaBackend | |
38 | |
39 log = getLogger(__name__) | |
40 | |
41 | |
42 PLUGIN_INFO = { | |
43 C.PI_NAME: "GRE Encrypter: OpenPGP", | |
44 C.PI_IMPORT_NAME: "GRE-OpenPGP", | |
45 C.PI_TYPE: "XEP", | |
46 C.PI_MODES: C.PLUG_MODE_BOTH, | |
47 C.PI_PROTOCOLS: [], | |
48 C.PI_DEPENDENCIES: [ | |
49 "GRE", | |
50 ], | |
51 C.PI_RECOMMENDATIONS: [], | |
52 C.PI_MAIN: "GREEncrypterOpenPGP", | |
53 C.PI_HANDLER: "yes", | |
54 C.PI_DESCRIPTION: _("Handle MIME formatting for Gateway Relayed Encryption."), | |
55 } | |
56 | |
57 NS_GRE_OPENPGP: Final = "urn:xmpp:gre:encrypter:openpgp:0" | |
58 | |
59 | |
60 class GREEncrypterOpenPGP(Encrypter): | |
61 name = "openpgp" | |
62 namespace = NS_GRE_OPENPGP | |
63 | |
64 def __init__(self, host: "LiberviaBackend") -> None: | |
65 log.info(f"plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization") | |
66 super().__init__(host) | |
67 host.register_namespace("gre-openpgp", NS_GRE_OPENPGP) | |
68 | |
69 def get_handler(self, client: SatXMPPEntity) -> XMPPHandler: | |
70 return GREMIMEHandler(self) | |
71 | |
72 async def encrypt( | |
73 self, | |
74 client: SatXMPPEntity, | |
75 recipient_id: str, | |
76 message_elt: domish.Element, | |
77 formatted_payload: bytes, | |
78 encryption_data_form: data_form.Form, | |
79 ) -> str: | |
80 gpg_provider = plugin_xep_0373.get_gpg_provider(self.host, client) | |
81 public_keys = gpg_provider.list_public_keys(recipient_id) | |
82 if not public_keys: | |
83 raise exceptions.NotFound( | |
84 f"No public keys found for {recipient_id!r}, we can't encrypt." | |
85 ) | |
86 encrypted_data = gpg_provider.encrypt( | |
87 formatted_payload, public_keys | |
88 ) | |
89 return base64.b64encode(encrypted_data).decode("ASCII") | |
90 | |
91 | |
92 @implementer(iwokkel.IDisco) | |
93 class GREMIMEHandler(XMPPHandler): | |
94 | |
95 def __init__(self, plugin_parent): | |
96 self.plugin_parent = plugin_parent | |
97 | |
98 def getDiscoInfo( | |
99 self, requestor: jid.JID, target: jid.JID, nodeIdentifier: str = "" | |
100 ) -> list[disco.DiscoFeature]: | |
101 return [ | |
102 disco.DiscoFeature(NS_GRE_OPENPGP), | |
103 ] | |
104 | |
105 def getDiscoItems( | |
106 self, requestor: jid.JID, target: jid.JID, nodeIdentifier: str = "" | |
107 ) -> list[disco.DiscoItems]: | |
108 return [] |