Mercurial > libervia-backend
comparison libervia/cli/cmd_encryption.py @ 4270:0d7bb4df2343
Reformatted code base using black.
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 19 Jun 2024 18:44:57 +0200 |
parents | 47401850dec6 |
children |
comparison
equal
deleted
inserted
replaced
4269:64a85ce8be70 | 4270:0d7bb4df2343 |
---|---|
29 class EncryptionAlgorithms(base.CommandBase): | 29 class EncryptionAlgorithms(base.CommandBase): |
30 | 30 |
31 def __init__(self, host): | 31 def __init__(self, host): |
32 extra_outputs = {"default": self.default_output} | 32 extra_outputs = {"default": self.default_output} |
33 super(EncryptionAlgorithms, self).__init__( | 33 super(EncryptionAlgorithms, self).__init__( |
34 host, "algorithms", | 34 host, |
35 "algorithms", | |
35 use_output=C.OUTPUT_LIST_DICT, | 36 use_output=C.OUTPUT_LIST_DICT, |
36 extra_outputs=extra_outputs, | 37 extra_outputs=extra_outputs, |
37 use_profile=False, | 38 use_profile=False, |
38 help=_("show available encryption algorithms")) | 39 help=_("show available encryption algorithms"), |
40 ) | |
39 | 41 |
40 def add_parser_options(self): | 42 def add_parser_options(self): |
41 pass | 43 pass |
42 | 44 |
43 def default_output(self, plugins): | 45 def default_output(self, plugins): |
44 if not plugins: | 46 if not plugins: |
45 self.disp(_("No encryption plugin registered!")) | 47 self.disp(_("No encryption plugin registered!")) |
46 else: | 48 else: |
47 self.disp(_("Following encryption algorithms are available: {algos}").format( | 49 self.disp( |
48 algos=', '.join([p['name'] for p in plugins]))) | 50 _("Following encryption algorithms are available: {algos}").format( |
51 algos=", ".join([p["name"] for p in plugins]) | |
52 ) | |
53 ) | |
49 | 54 |
50 async def start(self): | 55 async def start(self): |
51 try: | 56 try: |
52 plugins_ser = await self.host.bridge.encryption_plugins_get() | 57 plugins_ser = await self.host.bridge.encryption_plugins_get() |
53 plugins = data_format.deserialise(plugins_ser, type_check=list) | 58 plugins = data_format.deserialise(plugins_ser, type_check=list) |
61 | 66 |
62 class EncryptionGet(base.CommandBase): | 67 class EncryptionGet(base.CommandBase): |
63 | 68 |
64 def __init__(self, host): | 69 def __init__(self, host): |
65 super(EncryptionGet, self).__init__( | 70 super(EncryptionGet, self).__init__( |
66 host, "get", | 71 host, "get", use_output=C.OUTPUT_DICT, help=_("get encryption session data") |
67 use_output=C.OUTPUT_DICT, | 72 ) |
68 help=_("get encryption session data")) | 73 |
69 | 74 def add_parser_options(self): |
70 def add_parser_options(self): | 75 self.parser.add_argument("jid", help=_("jid of the entity to check")) |
71 self.parser.add_argument( | |
72 "jid", | |
73 help=_("jid of the entity to check") | |
74 ) | |
75 | 76 |
76 async def start(self): | 77 async def start(self): |
77 jids = await self.host.check_jids([self.args.jid]) | 78 jids = await self.host.check_jids([self.args.jid]) |
78 jid = jids[0] | 79 jid = jids[0] |
79 try: | 80 try: |
82 self.disp(f"can't get session: {e}", error=True) | 83 self.disp(f"can't get session: {e}", error=True) |
83 self.host.quit(C.EXIT_BRIDGE_ERRBACK) | 84 self.host.quit(C.EXIT_BRIDGE_ERRBACK) |
84 | 85 |
85 session_data = data_format.deserialise(serialised) | 86 session_data = data_format.deserialise(serialised) |
86 if session_data is None: | 87 if session_data is None: |
87 self.disp( | 88 self.disp("No encryption session found, the messages are sent in plain text.") |
88 "No encryption session found, the messages are sent in plain text.") | |
89 self.host.quit(C.EXIT_NOT_FOUND) | 89 self.host.quit(C.EXIT_NOT_FOUND) |
90 await self.output(session_data) | 90 await self.output(session_data) |
91 self.host.quit() | 91 self.host.quit() |
92 | 92 |
93 | 93 |
94 class EncryptionStart(base.CommandBase): | 94 class EncryptionStart(base.CommandBase): |
95 | 95 |
96 def __init__(self, host): | 96 def __init__(self, host): |
97 super(EncryptionStart, self).__init__( | 97 super(EncryptionStart, self).__init__( |
98 host, "start", | 98 host, "start", help=_("start encrypted session with an entity") |
99 help=_("start encrypted session with an entity")) | 99 ) |
100 | 100 |
101 def add_parser_options(self): | 101 def add_parser_options(self): |
102 self.parser.add_argument( | 102 self.parser.add_argument( |
103 "--encrypt-noreplace", | 103 "--encrypt-noreplace", |
104 action="store_true", | 104 action="store_true", |
105 help=_("don't replace encryption algorithm if an other one is already used")) | 105 help=_("don't replace encryption algorithm if an other one is already used"), |
106 ) | |
106 algorithm = self.parser.add_mutually_exclusive_group() | 107 algorithm = self.parser.add_mutually_exclusive_group() |
107 algorithm.add_argument( | 108 algorithm.add_argument( |
108 "-n", "--name", help=_("algorithm name (DEFAULT: choose automatically)")) | 109 "-n", "--name", help=_("algorithm name (DEFAULT: choose automatically)") |
109 algorithm.add_argument( | 110 ) |
110 "-N", "--namespace", | 111 algorithm.add_argument( |
111 help=_("algorithm namespace (DEFAULT: choose automatically)")) | 112 "-N", |
112 self.parser.add_argument( | 113 "--namespace", |
113 "jid", | 114 help=_("algorithm namespace (DEFAULT: choose automatically)"), |
114 help=_("jid of the entity to stop encrypted session with") | 115 ) |
116 self.parser.add_argument( | |
117 "jid", help=_("jid of the entity to stop encrypted session with") | |
115 ) | 118 ) |
116 | 119 |
117 async def start(self): | 120 async def start(self): |
118 if self.args.name is not None: | 121 if self.args.name is not None: |
119 try: | 122 try: |
120 namespace = await self.host.bridge.encryption_namespace_get(self.args.name) | 123 namespace = await self.host.bridge.encryption_namespace_get( |
124 self.args.name | |
125 ) | |
121 except Exception as e: | 126 except Exception as e: |
122 self.disp(f"can't get encryption namespace: {e}", error=True) | 127 self.disp(f"can't get encryption namespace: {e}", error=True) |
123 self.host.quit(C.EXIT_BRIDGE_ERRBACK) | 128 self.host.quit(C.EXIT_BRIDGE_ERRBACK) |
124 elif self.args.namespace is not None: | 129 elif self.args.namespace is not None: |
125 namespace = self.args.namespace | 130 namespace = self.args.namespace |
129 jids = await self.host.check_jids([self.args.jid]) | 134 jids = await self.host.check_jids([self.args.jid]) |
130 jid = jids[0] | 135 jid = jids[0] |
131 | 136 |
132 try: | 137 try: |
133 await self.host.bridge.message_encryption_start( | 138 await self.host.bridge.message_encryption_start( |
134 jid, namespace, not self.args.encrypt_noreplace, | 139 jid, namespace, not self.args.encrypt_noreplace, self.profile |
135 self.profile) | 140 ) |
136 except Exception as e: | 141 except Exception as e: |
137 self.disp(f"can't get encryption namespace: {e}", error=True) | 142 self.disp(f"can't get encryption namespace: {e}", error=True) |
138 self.host.quit(C.EXIT_BRIDGE_ERRBACK) | 143 self.host.quit(C.EXIT_BRIDGE_ERRBACK) |
139 | 144 |
140 self.host.quit() | 145 self.host.quit() |
142 | 147 |
143 class EncryptionStop(base.CommandBase): | 148 class EncryptionStop(base.CommandBase): |
144 | 149 |
145 def __init__(self, host): | 150 def __init__(self, host): |
146 super(EncryptionStop, self).__init__( | 151 super(EncryptionStop, self).__init__( |
147 host, "stop", | 152 host, "stop", help=_("stop encrypted session with an entity") |
148 help=_("stop encrypted session with an entity")) | 153 ) |
149 | 154 |
150 def add_parser_options(self): | 155 def add_parser_options(self): |
151 self.parser.add_argument( | 156 self.parser.add_argument( |
152 "jid", | 157 "jid", help=_("jid of the entity to stop encrypted session with") |
153 help=_("jid of the entity to stop encrypted session with") | |
154 ) | 158 ) |
155 | 159 |
156 async def start(self): | 160 async def start(self): |
157 jids = await self.host.check_jids([self.args.jid]) | 161 jids = await self.host.check_jids([self.args.jid]) |
158 jid = jids[0] | 162 jid = jids[0] |
166 | 170 |
167 | 171 |
168 class TrustUI(base.CommandBase): | 172 class TrustUI(base.CommandBase): |
169 | 173 |
170 def __init__(self, host): | 174 def __init__(self, host): |
171 super(TrustUI, self).__init__( | 175 super(TrustUI, self).__init__(host, "ui", help=_("get UI to manage trust")) |
172 host, "ui", | 176 |
173 help=_("get UI to manage trust")) | 177 def add_parser_options(self): |
174 | 178 self.parser.add_argument( |
175 def add_parser_options(self): | 179 "jid", help=_("jid of the entity to stop encrypted session with") |
176 self.parser.add_argument( | |
177 "jid", | |
178 help=_("jid of the entity to stop encrypted session with") | |
179 ) | 180 ) |
180 algorithm = self.parser.add_mutually_exclusive_group() | 181 algorithm = self.parser.add_mutually_exclusive_group() |
181 algorithm.add_argument( | 182 algorithm.add_argument( |
182 "-n", "--name", help=_("algorithm name (DEFAULT: current algorithm)")) | 183 "-n", "--name", help=_("algorithm name (DEFAULT: current algorithm)") |
183 algorithm.add_argument( | 184 ) |
184 "-N", "--namespace", | 185 algorithm.add_argument( |
185 help=_("algorithm namespace (DEFAULT: current algorithm)")) | 186 "-N", |
187 "--namespace", | |
188 help=_("algorithm namespace (DEFAULT: current algorithm)"), | |
189 ) | |
186 | 190 |
187 async def start(self): | 191 async def start(self): |
188 if self.args.name is not None: | 192 if self.args.name is not None: |
189 try: | 193 try: |
190 namespace = await self.host.bridge.encryption_namespace_get(self.args.name) | 194 namespace = await self.host.bridge.encryption_namespace_get( |
195 self.args.name | |
196 ) | |
191 except Exception as e: | 197 except Exception as e: |
192 self.disp(f"can't get encryption namespace: {e}", error=True) | 198 self.disp(f"can't get encryption namespace: {e}", error=True) |
193 self.host.quit(C.EXIT_BRIDGE_ERRBACK) | 199 self.host.quit(C.EXIT_BRIDGE_ERRBACK) |
194 elif self.args.namespace is not None: | 200 elif self.args.namespace is not None: |
195 namespace = self.args.namespace | 201 namespace = self.args.namespace |
199 jids = await self.host.check_jids([self.args.jid]) | 205 jids = await self.host.check_jids([self.args.jid]) |
200 jid = jids[0] | 206 jid = jids[0] |
201 | 207 |
202 try: | 208 try: |
203 xmlui_raw = await self.host.bridge.encryption_trust_ui_get( | 209 xmlui_raw = await self.host.bridge.encryption_trust_ui_get( |
204 jid, namespace, self.profile) | 210 jid, namespace, self.profile |
211 ) | |
205 except Exception as e: | 212 except Exception as e: |
206 self.disp(f"can't get encryption session trust UI: {e}", error=True) | 213 self.disp(f"can't get encryption session trust UI: {e}", error=True) |
207 self.host.quit(C.EXIT_BRIDGE_ERRBACK) | 214 self.host.quit(C.EXIT_BRIDGE_ERRBACK) |
208 | 215 |
209 xmlui = xmlui_manager.create(self.host, xmlui_raw) | 216 xmlui = xmlui_manager.create(self.host, xmlui_raw) |
210 await xmlui.show() | 217 await xmlui.show() |
211 if xmlui.type != C.XMLUI_DIALOG: | 218 if xmlui.type != C.XMLUI_DIALOG: |
212 await xmlui.submit_form() | 219 await xmlui.submit_form() |
213 self.host.quit() | 220 self.host.quit() |
214 | 221 |
222 | |
215 class EncryptionTrust(base.CommandBase): | 223 class EncryptionTrust(base.CommandBase): |
216 subcommands = (TrustUI,) | 224 subcommands = (TrustUI,) |
217 | 225 |
218 def __init__(self, host): | 226 def __init__(self, host): |
219 super(EncryptionTrust, self).__init__( | 227 super(EncryptionTrust, self).__init__( |
220 host, "trust", use_profile=False, help=_("trust manangement") | 228 host, "trust", use_profile=False, help=_("trust manangement") |
221 ) | 229 ) |
222 | 230 |
223 | 231 |
224 class Encryption(base.CommandBase): | 232 class Encryption(base.CommandBase): |
225 subcommands = (EncryptionAlgorithms, EncryptionGet, EncryptionStart, EncryptionStop, | 233 subcommands = ( |
226 EncryptionTrust) | 234 EncryptionAlgorithms, |
235 EncryptionGet, | |
236 EncryptionStart, | |
237 EncryptionStop, | |
238 EncryptionTrust, | |
239 ) | |
227 | 240 |
228 def __init__(self, host): | 241 def __init__(self, host): |
229 super(Encryption, self).__init__( | 242 super(Encryption, self).__init__( |
230 host, "encryption", use_profile=False, help=_("encryption sessions handling") | 243 host, "encryption", use_profile=False, help=_("encryption sessions handling") |
231 ) | 244 ) |