comparison libervia/cli/cmd_encryption.py @ 4270:0d7bb4df2343

Reformatted code base using black.
author Goffi <goffi@goffi.org>
date Wed, 19 Jun 2024 18:44:57 +0200
parents 47401850dec6
children
comparison
equal deleted inserted replaced
4269:64a85ce8be70 4270:0d7bb4df2343
29 class EncryptionAlgorithms(base.CommandBase): 29 class EncryptionAlgorithms(base.CommandBase):
30 30
31 def __init__(self, host): 31 def __init__(self, host):
32 extra_outputs = {"default": self.default_output} 32 extra_outputs = {"default": self.default_output}
33 super(EncryptionAlgorithms, self).__init__( 33 super(EncryptionAlgorithms, self).__init__(
34 host, "algorithms", 34 host,
35 "algorithms",
35 use_output=C.OUTPUT_LIST_DICT, 36 use_output=C.OUTPUT_LIST_DICT,
36 extra_outputs=extra_outputs, 37 extra_outputs=extra_outputs,
37 use_profile=False, 38 use_profile=False,
38 help=_("show available encryption algorithms")) 39 help=_("show available encryption algorithms"),
40 )
39 41
40 def add_parser_options(self): 42 def add_parser_options(self):
41 pass 43 pass
42 44
43 def default_output(self, plugins): 45 def default_output(self, plugins):
44 if not plugins: 46 if not plugins:
45 self.disp(_("No encryption plugin registered!")) 47 self.disp(_("No encryption plugin registered!"))
46 else: 48 else:
47 self.disp(_("Following encryption algorithms are available: {algos}").format( 49 self.disp(
48 algos=', '.join([p['name'] for p in plugins]))) 50 _("Following encryption algorithms are available: {algos}").format(
51 algos=", ".join([p["name"] for p in plugins])
52 )
53 )
49 54
50 async def start(self): 55 async def start(self):
51 try: 56 try:
52 plugins_ser = await self.host.bridge.encryption_plugins_get() 57 plugins_ser = await self.host.bridge.encryption_plugins_get()
53 plugins = data_format.deserialise(plugins_ser, type_check=list) 58 plugins = data_format.deserialise(plugins_ser, type_check=list)
61 66
62 class EncryptionGet(base.CommandBase): 67 class EncryptionGet(base.CommandBase):
63 68
64 def __init__(self, host): 69 def __init__(self, host):
65 super(EncryptionGet, self).__init__( 70 super(EncryptionGet, self).__init__(
66 host, "get", 71 host, "get", use_output=C.OUTPUT_DICT, help=_("get encryption session data")
67 use_output=C.OUTPUT_DICT, 72 )
68 help=_("get encryption session data")) 73
69 74 def add_parser_options(self):
70 def add_parser_options(self): 75 self.parser.add_argument("jid", help=_("jid of the entity to check"))
71 self.parser.add_argument(
72 "jid",
73 help=_("jid of the entity to check")
74 )
75 76
76 async def start(self): 77 async def start(self):
77 jids = await self.host.check_jids([self.args.jid]) 78 jids = await self.host.check_jids([self.args.jid])
78 jid = jids[0] 79 jid = jids[0]
79 try: 80 try:
82 self.disp(f"can't get session: {e}", error=True) 83 self.disp(f"can't get session: {e}", error=True)
83 self.host.quit(C.EXIT_BRIDGE_ERRBACK) 84 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
84 85
85 session_data = data_format.deserialise(serialised) 86 session_data = data_format.deserialise(serialised)
86 if session_data is None: 87 if session_data is None:
87 self.disp( 88 self.disp("No encryption session found, the messages are sent in plain text.")
88 "No encryption session found, the messages are sent in plain text.")
89 self.host.quit(C.EXIT_NOT_FOUND) 89 self.host.quit(C.EXIT_NOT_FOUND)
90 await self.output(session_data) 90 await self.output(session_data)
91 self.host.quit() 91 self.host.quit()
92 92
93 93
94 class EncryptionStart(base.CommandBase): 94 class EncryptionStart(base.CommandBase):
95 95
96 def __init__(self, host): 96 def __init__(self, host):
97 super(EncryptionStart, self).__init__( 97 super(EncryptionStart, self).__init__(
98 host, "start", 98 host, "start", help=_("start encrypted session with an entity")
99 help=_("start encrypted session with an entity")) 99 )
100 100
101 def add_parser_options(self): 101 def add_parser_options(self):
102 self.parser.add_argument( 102 self.parser.add_argument(
103 "--encrypt-noreplace", 103 "--encrypt-noreplace",
104 action="store_true", 104 action="store_true",
105 help=_("don't replace encryption algorithm if an other one is already used")) 105 help=_("don't replace encryption algorithm if an other one is already used"),
106 )
106 algorithm = self.parser.add_mutually_exclusive_group() 107 algorithm = self.parser.add_mutually_exclusive_group()
107 algorithm.add_argument( 108 algorithm.add_argument(
108 "-n", "--name", help=_("algorithm name (DEFAULT: choose automatically)")) 109 "-n", "--name", help=_("algorithm name (DEFAULT: choose automatically)")
109 algorithm.add_argument( 110 )
110 "-N", "--namespace", 111 algorithm.add_argument(
111 help=_("algorithm namespace (DEFAULT: choose automatically)")) 112 "-N",
112 self.parser.add_argument( 113 "--namespace",
113 "jid", 114 help=_("algorithm namespace (DEFAULT: choose automatically)"),
114 help=_("jid of the entity to stop encrypted session with") 115 )
116 self.parser.add_argument(
117 "jid", help=_("jid of the entity to stop encrypted session with")
115 ) 118 )
116 119
117 async def start(self): 120 async def start(self):
118 if self.args.name is not None: 121 if self.args.name is not None:
119 try: 122 try:
120 namespace = await self.host.bridge.encryption_namespace_get(self.args.name) 123 namespace = await self.host.bridge.encryption_namespace_get(
124 self.args.name
125 )
121 except Exception as e: 126 except Exception as e:
122 self.disp(f"can't get encryption namespace: {e}", error=True) 127 self.disp(f"can't get encryption namespace: {e}", error=True)
123 self.host.quit(C.EXIT_BRIDGE_ERRBACK) 128 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
124 elif self.args.namespace is not None: 129 elif self.args.namespace is not None:
125 namespace = self.args.namespace 130 namespace = self.args.namespace
129 jids = await self.host.check_jids([self.args.jid]) 134 jids = await self.host.check_jids([self.args.jid])
130 jid = jids[0] 135 jid = jids[0]
131 136
132 try: 137 try:
133 await self.host.bridge.message_encryption_start( 138 await self.host.bridge.message_encryption_start(
134 jid, namespace, not self.args.encrypt_noreplace, 139 jid, namespace, not self.args.encrypt_noreplace, self.profile
135 self.profile) 140 )
136 except Exception as e: 141 except Exception as e:
137 self.disp(f"can't get encryption namespace: {e}", error=True) 142 self.disp(f"can't get encryption namespace: {e}", error=True)
138 self.host.quit(C.EXIT_BRIDGE_ERRBACK) 143 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
139 144
140 self.host.quit() 145 self.host.quit()
142 147
143 class EncryptionStop(base.CommandBase): 148 class EncryptionStop(base.CommandBase):
144 149
145 def __init__(self, host): 150 def __init__(self, host):
146 super(EncryptionStop, self).__init__( 151 super(EncryptionStop, self).__init__(
147 host, "stop", 152 host, "stop", help=_("stop encrypted session with an entity")
148 help=_("stop encrypted session with an entity")) 153 )
149 154
150 def add_parser_options(self): 155 def add_parser_options(self):
151 self.parser.add_argument( 156 self.parser.add_argument(
152 "jid", 157 "jid", help=_("jid of the entity to stop encrypted session with")
153 help=_("jid of the entity to stop encrypted session with")
154 ) 158 )
155 159
156 async def start(self): 160 async def start(self):
157 jids = await self.host.check_jids([self.args.jid]) 161 jids = await self.host.check_jids([self.args.jid])
158 jid = jids[0] 162 jid = jids[0]
166 170
167 171
168 class TrustUI(base.CommandBase): 172 class TrustUI(base.CommandBase):
169 173
170 def __init__(self, host): 174 def __init__(self, host):
171 super(TrustUI, self).__init__( 175 super(TrustUI, self).__init__(host, "ui", help=_("get UI to manage trust"))
172 host, "ui", 176
173 help=_("get UI to manage trust")) 177 def add_parser_options(self):
174 178 self.parser.add_argument(
175 def add_parser_options(self): 179 "jid", help=_("jid of the entity to stop encrypted session with")
176 self.parser.add_argument(
177 "jid",
178 help=_("jid of the entity to stop encrypted session with")
179 ) 180 )
180 algorithm = self.parser.add_mutually_exclusive_group() 181 algorithm = self.parser.add_mutually_exclusive_group()
181 algorithm.add_argument( 182 algorithm.add_argument(
182 "-n", "--name", help=_("algorithm name (DEFAULT: current algorithm)")) 183 "-n", "--name", help=_("algorithm name (DEFAULT: current algorithm)")
183 algorithm.add_argument( 184 )
184 "-N", "--namespace", 185 algorithm.add_argument(
185 help=_("algorithm namespace (DEFAULT: current algorithm)")) 186 "-N",
187 "--namespace",
188 help=_("algorithm namespace (DEFAULT: current algorithm)"),
189 )
186 190
187 async def start(self): 191 async def start(self):
188 if self.args.name is not None: 192 if self.args.name is not None:
189 try: 193 try:
190 namespace = await self.host.bridge.encryption_namespace_get(self.args.name) 194 namespace = await self.host.bridge.encryption_namespace_get(
195 self.args.name
196 )
191 except Exception as e: 197 except Exception as e:
192 self.disp(f"can't get encryption namespace: {e}", error=True) 198 self.disp(f"can't get encryption namespace: {e}", error=True)
193 self.host.quit(C.EXIT_BRIDGE_ERRBACK) 199 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
194 elif self.args.namespace is not None: 200 elif self.args.namespace is not None:
195 namespace = self.args.namespace 201 namespace = self.args.namespace
199 jids = await self.host.check_jids([self.args.jid]) 205 jids = await self.host.check_jids([self.args.jid])
200 jid = jids[0] 206 jid = jids[0]
201 207
202 try: 208 try:
203 xmlui_raw = await self.host.bridge.encryption_trust_ui_get( 209 xmlui_raw = await self.host.bridge.encryption_trust_ui_get(
204 jid, namespace, self.profile) 210 jid, namespace, self.profile
211 )
205 except Exception as e: 212 except Exception as e:
206 self.disp(f"can't get encryption session trust UI: {e}", error=True) 213 self.disp(f"can't get encryption session trust UI: {e}", error=True)
207 self.host.quit(C.EXIT_BRIDGE_ERRBACK) 214 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
208 215
209 xmlui = xmlui_manager.create(self.host, xmlui_raw) 216 xmlui = xmlui_manager.create(self.host, xmlui_raw)
210 await xmlui.show() 217 await xmlui.show()
211 if xmlui.type != C.XMLUI_DIALOG: 218 if xmlui.type != C.XMLUI_DIALOG:
212 await xmlui.submit_form() 219 await xmlui.submit_form()
213 self.host.quit() 220 self.host.quit()
214 221
222
215 class EncryptionTrust(base.CommandBase): 223 class EncryptionTrust(base.CommandBase):
216 subcommands = (TrustUI,) 224 subcommands = (TrustUI,)
217 225
218 def __init__(self, host): 226 def __init__(self, host):
219 super(EncryptionTrust, self).__init__( 227 super(EncryptionTrust, self).__init__(
220 host, "trust", use_profile=False, help=_("trust manangement") 228 host, "trust", use_profile=False, help=_("trust manangement")
221 ) 229 )
222 230
223 231
224 class Encryption(base.CommandBase): 232 class Encryption(base.CommandBase):
225 subcommands = (EncryptionAlgorithms, EncryptionGet, EncryptionStart, EncryptionStop, 233 subcommands = (
226 EncryptionTrust) 234 EncryptionAlgorithms,
235 EncryptionGet,
236 EncryptionStart,
237 EncryptionStop,
238 EncryptionTrust,
239 )
227 240
228 def __init__(self, host): 241 def __init__(self, host):
229 super(Encryption, self).__init__( 242 super(Encryption, self).__init__(
230 host, "encryption", use_profile=False, help=_("encryption sessions handling") 243 host, "encryption", use_profile=False, help=_("encryption sessions handling")
231 ) 244 )