comparison libervia/cli/cmd_encryption.py @ 4075:47401850dec6

refactoring: rename `libervia.frontends.jp` to `libervia.cli`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 14:54:26 +0200
parents libervia/frontends/jp/cmd_encryption.py@26b7ed2817da
children 0d7bb4df2343
comparison
equal deleted inserted replaced
4074:26b7ed2817da 4075:47401850dec6
1 #!/usr/bin/env python3
2
3
4 # Libervia CLI
5 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
6
7 # This program is free software: you can redistribute it and/or modify
8 # it under the terms of the GNU Affero General Public License as published by
9 # the Free Software Foundation, either version 3 of the License, or
10 # (at your option) any later version.
11
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU Affero General Public License for more details.
16
17 # You should have received a copy of the GNU Affero General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19
20 from libervia.cli import base
21 from libervia.cli.constants import Const as C
22 from libervia.backend.core.i18n import _
23 from libervia.backend.tools.common import data_format
24 from libervia.cli import xmlui_manager
25
26 __commands__ = ["Encryption"]
27
28
29 class EncryptionAlgorithms(base.CommandBase):
30
31 def __init__(self, host):
32 extra_outputs = {"default": self.default_output}
33 super(EncryptionAlgorithms, self).__init__(
34 host, "algorithms",
35 use_output=C.OUTPUT_LIST_DICT,
36 extra_outputs=extra_outputs,
37 use_profile=False,
38 help=_("show available encryption algorithms"))
39
40 def add_parser_options(self):
41 pass
42
43 def default_output(self, plugins):
44 if not plugins:
45 self.disp(_("No encryption plugin registered!"))
46 else:
47 self.disp(_("Following encryption algorithms are available: {algos}").format(
48 algos=', '.join([p['name'] for p in plugins])))
49
50 async def start(self):
51 try:
52 plugins_ser = await self.host.bridge.encryption_plugins_get()
53 plugins = data_format.deserialise(plugins_ser, type_check=list)
54 except Exception as e:
55 self.disp(f"can't retrieve plugins: {e}", error=True)
56 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
57 else:
58 await self.output(plugins)
59 self.host.quit()
60
61
62 class EncryptionGet(base.CommandBase):
63
64 def __init__(self, host):
65 super(EncryptionGet, self).__init__(
66 host, "get",
67 use_output=C.OUTPUT_DICT,
68 help=_("get encryption session data"))
69
70 def add_parser_options(self):
71 self.parser.add_argument(
72 "jid",
73 help=_("jid of the entity to check")
74 )
75
76 async def start(self):
77 jids = await self.host.check_jids([self.args.jid])
78 jid = jids[0]
79 try:
80 serialised = await self.host.bridge.message_encryption_get(jid, self.profile)
81 except Exception as e:
82 self.disp(f"can't get session: {e}", error=True)
83 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
84
85 session_data = data_format.deserialise(serialised)
86 if session_data is None:
87 self.disp(
88 "No encryption session found, the messages are sent in plain text.")
89 self.host.quit(C.EXIT_NOT_FOUND)
90 await self.output(session_data)
91 self.host.quit()
92
93
94 class EncryptionStart(base.CommandBase):
95
96 def __init__(self, host):
97 super(EncryptionStart, self).__init__(
98 host, "start",
99 help=_("start encrypted session with an entity"))
100
101 def add_parser_options(self):
102 self.parser.add_argument(
103 "--encrypt-noreplace",
104 action="store_true",
105 help=_("don't replace encryption algorithm if an other one is already used"))
106 algorithm = self.parser.add_mutually_exclusive_group()
107 algorithm.add_argument(
108 "-n", "--name", help=_("algorithm name (DEFAULT: choose automatically)"))
109 algorithm.add_argument(
110 "-N", "--namespace",
111 help=_("algorithm namespace (DEFAULT: choose automatically)"))
112 self.parser.add_argument(
113 "jid",
114 help=_("jid of the entity to stop encrypted session with")
115 )
116
117 async def start(self):
118 if self.args.name is not None:
119 try:
120 namespace = await self.host.bridge.encryption_namespace_get(self.args.name)
121 except Exception as e:
122 self.disp(f"can't get encryption namespace: {e}", error=True)
123 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
124 elif self.args.namespace is not None:
125 namespace = self.args.namespace
126 else:
127 namespace = ""
128
129 jids = await self.host.check_jids([self.args.jid])
130 jid = jids[0]
131
132 try:
133 await self.host.bridge.message_encryption_start(
134 jid, namespace, not self.args.encrypt_noreplace,
135 self.profile)
136 except Exception as e:
137 self.disp(f"can't get encryption namespace: {e}", error=True)
138 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
139
140 self.host.quit()
141
142
143 class EncryptionStop(base.CommandBase):
144
145 def __init__(self, host):
146 super(EncryptionStop, self).__init__(
147 host, "stop",
148 help=_("stop encrypted session with an entity"))
149
150 def add_parser_options(self):
151 self.parser.add_argument(
152 "jid",
153 help=_("jid of the entity to stop encrypted session with")
154 )
155
156 async def start(self):
157 jids = await self.host.check_jids([self.args.jid])
158 jid = jids[0]
159 try:
160 await self.host.bridge.message_encryption_stop(jid, self.profile)
161 except Exception as e:
162 self.disp(f"can't end encrypted session: {e}", error=True)
163 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
164
165 self.host.quit()
166
167
168 class TrustUI(base.CommandBase):
169
170 def __init__(self, host):
171 super(TrustUI, self).__init__(
172 host, "ui",
173 help=_("get UI to manage trust"))
174
175 def add_parser_options(self):
176 self.parser.add_argument(
177 "jid",
178 help=_("jid of the entity to stop encrypted session with")
179 )
180 algorithm = self.parser.add_mutually_exclusive_group()
181 algorithm.add_argument(
182 "-n", "--name", help=_("algorithm name (DEFAULT: current algorithm)"))
183 algorithm.add_argument(
184 "-N", "--namespace",
185 help=_("algorithm namespace (DEFAULT: current algorithm)"))
186
187 async def start(self):
188 if self.args.name is not None:
189 try:
190 namespace = await self.host.bridge.encryption_namespace_get(self.args.name)
191 except Exception as e:
192 self.disp(f"can't get encryption namespace: {e}", error=True)
193 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
194 elif self.args.namespace is not None:
195 namespace = self.args.namespace
196 else:
197 namespace = ""
198
199 jids = await self.host.check_jids([self.args.jid])
200 jid = jids[0]
201
202 try:
203 xmlui_raw = await self.host.bridge.encryption_trust_ui_get(
204 jid, namespace, self.profile)
205 except Exception as e:
206 self.disp(f"can't get encryption session trust UI: {e}", error=True)
207 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
208
209 xmlui = xmlui_manager.create(self.host, xmlui_raw)
210 await xmlui.show()
211 if xmlui.type != C.XMLUI_DIALOG:
212 await xmlui.submit_form()
213 self.host.quit()
214
215 class EncryptionTrust(base.CommandBase):
216 subcommands = (TrustUI,)
217
218 def __init__(self, host):
219 super(EncryptionTrust, self).__init__(
220 host, "trust", use_profile=False, help=_("trust manangement")
221 )
222
223
224 class Encryption(base.CommandBase):
225 subcommands = (EncryptionAlgorithms, EncryptionGet, EncryptionStart, EncryptionStop,
226 EncryptionTrust)
227
228 def __init__(self, host):
229 super(Encryption, self).__init__(
230 host, "encryption", use_profile=False, help=_("encryption sessions handling")
231 )