Mercurial > libervia-backend
diff sat_frontends/jp/cmd_encryption.py @ 2740:8fd8ce5a5855
jp (message/send, encryption): encryption handling:
- encryption algorithm can now be requested when sending a message (using --encrypt option)
- new encryption commands to (de)activate encryption session, check available algorithms, or manage trust.
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 02 Jan 2019 18:50:57 +0100 |
parents | |
children | 003b8b4b56a7 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/sat_frontends/jp/cmd_encryption.py Wed Jan 02 18:50:57 2019 +0100 @@ -0,0 +1,253 @@ +#!/usr/bin/env python2 +# -*- coding: utf-8 -*- + +# jp: a SAT command line tool +# Copyright (C) 2009-2018 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +from sat_frontends.jp import base +from sat_frontends.jp.constants import Const as C +from sat.core.i18n import _ +from functools import partial +from sat.tools.common import data_format +from sat_frontends.jp import xmlui_manager + +__commands__ = ["Encryption"] + + +class EncryptionAlgorithms(base.CommandBase): + + def __init__(self, host): + extra_outputs = {"default": self.default_output} + super(EncryptionAlgorithms, self).__init__( + host, "algorithms", + use_output=C.OUTPUT_LIST_DICT, + extra_outputs=extra_outputs, + use_profile=False, + help=_("show available encryption algorithms")) + self.need_loop = True + + def add_parser_options(self): + pass + + def encryptionPluginsGetCb(self, plugins): + self.output(plugins) + self.host.quit() + + def default_output(self, plugins): + if not plugins: + self.disp(_(u"No encryption plugin registered!")) + self.host.quit(C.EXIT_NOT_FOUND) + else: + self.disp(_(u"Following encryption algorithms are available: {algos}").format( + algos=', '.join([p['name'] for p in plugins]))) + self.host.quit() + + def start(self): + self.host.bridge.encryptionPluginsGet( + callback=self.encryptionPluginsGetCb, + errback=partial( + self.errback, + msg=_(u"can't retrieve plugins: {}"), + exit_code=C.EXIT_BRIDGE_ERRBACK, + ), + ) + + +class EncryptionGet(base.CommandBase): + + def __init__(self, host): + super(EncryptionGet, self).__init__( + host, "get", + use_output=C.OUTPUT_DICT, + help=_(u"get encryption session data")) + self.need_loop = True + + def add_parser_options(self): + self.parser.add_argument( + "jid", type=base.unicode_decoder, + help=_(u"jid of the entity to check") + ) + + def messageEncryptionGetCb(self, serialised): + session_data = data_format.deserialise(serialised) + if session_data is None: + self.disp( + u"No encryption session found, the messages are sent in plain text.") + self.host.quit(C.EXIT_NOT_FOUND) + self.output(session_data) + self.host.quit() + + def start(self): + jids = self.host.check_jids([self.args.jid]) + jid = jids[0] + self.host.bridge.messageEncryptionGet( + jid, self.profile, + callback=self.messageEncryptionGetCb, + errback=partial( + self.errback, + msg=_(u"can't get session: {}"), + exit_code=C.EXIT_BRIDGE_ERRBACK, + ), + ) + + +class EncryptionStart(base.CommandBase): + + def __init__(self, host): + super(EncryptionStart, self).__init__( + host, "start", + help=_(u"start encrypted session with an entity")) + self.need_loop = True + + def add_parser_options(self): + self.parser.add_argument( + "--encrypt-noreplace", + action="store_true", + help=_(u"don't replace encryption algorithm if an other one is already used")) + algorithm = self.parser.add_mutually_exclusive_group() + algorithm.add_argument( + "-n", "--name", help=_(u"algorithm name (DEFAULT: choose automatically)")) + algorithm.add_argument( + "-N", "--namespace", + help=_(u"algorithm namespace (DEFAULT: choose automatically)")) + self.parser.add_argument( + "jid", type=base.unicode_decoder, + help=_(u"jid of the entity to stop encrypted session with") + ) + + def encryptionNamespaceGetCb(self, namespace): + jids = self.host.check_jids([self.args.jid]) + jid = jids[0] + self.host.bridge.messageEncryptionStart( + jid, namespace, not self.args.encrypt_noreplace, + self.profile, + callback=self.host.quit, + errback=partial(self.errback, + msg=_(u"Can't start encryption session: {}"), + exit_code=C.EXIT_BRIDGE_ERRBACK, + )) + + def start(self): + if self.args.name is not None: + self.host.bridge.encryptionNamespaceGet(self.args.name, + callback=self.encryptionNamespaceGetCb, + errback=partial(self.errback, + msg=_(u"Can't get encryption namespace: {}"), + exit_code=C.EXIT_BRIDGE_ERRBACK, + )) + elif self.args.namespace is not None: + self.encryptionNamespaceGetCb(self.args.namespace) + else: + self.encryptionNamespaceGetCb(u"") + + +class EncryptionStop(base.CommandBase): + + def __init__(self, host): + super(EncryptionStop, self).__init__( + host, "stop", + help=_(u"stop encrypted session with an entity")) + self.need_loop = True + + def add_parser_options(self): + self.parser.add_argument( + "jid", type=base.unicode_decoder, + help=_(u"jid of the entity to stop encrypted session with") + ) + + def start(self): + jids = self.host.check_jids([self.args.jid]) + jid = jids[0] + self.host.bridge.messageEncryptionStop( + jid, self.profile, + callback=self.host.quit, + errback=partial( + self.errback, + msg=_(u"can't end encrypted session: {}"), + exit_code=C.EXIT_BRIDGE_ERRBACK, + ), + ) + + +class TrustUI(base.CommandBase): + + def __init__(self, host): + super(TrustUI, self).__init__( + host, "ui", + help=_(u"get UI to manage trust")) + self.need_loop = True + + def add_parser_options(self): + self.parser.add_argument( + "jid", type=base.unicode_decoder, + help=_(u"jid of the entity to stop encrypted session with") + ) + algorithm = self.parser.add_mutually_exclusive_group() + algorithm.add_argument( + "-n", "--name", help=_(u"algorithm name (DEFAULT: current algorithm)")) + algorithm.add_argument( + "-N", "--namespace", + help=_(u"algorithm namespace (DEFAULT: current algorithm)")) + + def encryptionTrustUIGetCb(self, xmlui_raw): + xmlui = xmlui_manager.create(self.host, xmlui_raw) + xmlui.show() + xmlui.submitForm() + + def encryptionNamespaceGetCb(self, namespace): + jids = self.host.check_jids([self.args.jid]) + jid = jids[0] + self.host.bridge.encryptionTrustUIGet( + jid, namespace, self.profile, + callback=self.encryptionTrustUIGetCb, + errback=partial( + self.errback, + msg=_(u"can't end encrypted session: {}"), + exit_code=C.EXIT_BRIDGE_ERRBACK, + ), + ) + + def start(self): + if self.args.name is not None: + self.host.bridge.encryptionNamespaceGet(self.args.name, + callback=self.encryptionNamespaceGetCb, + errback=partial(self.errback, + msg=_(u"Can't get encryption namespace: {}"), + exit_code=C.EXIT_BRIDGE_ERRBACK, + )) + elif self.args.namespace is not None: + self.encryptionNamespaceGetCb(self.args.namespace) + else: + self.encryptionNamespaceGetCb(u"") + + +class EncryptionTrust(base.CommandBase): + subcommands = (TrustUI,) + + def __init__(self, host): + super(EncryptionTrust, self).__init__( + host, "trust", use_profile=False, help=_(u"trust manangement") + ) + + +class Encryption(base.CommandBase): + subcommands = (EncryptionAlgorithms, EncryptionGet, EncryptionStart, EncryptionStop, + EncryptionTrust) + + def __init__(self, host): + super(Encryption, self).__init__( + host, "encryption", use_profile=False, help=_(u"encryption sessions handling") + )