view sat_frontends/jp/cmd_encryption.py @ 3032:95e2fd14a761

plugin XEP-0329: fixed ShareNode item() and values after Python 3 port
author Goffi <goffi@goffi.org>
date Fri, 16 Aug 2019 17:06:44 +0200
parents ab2696e34d29
children fee60f17ebac
line wrap: on
line source

#!/usr/bin/env python2
# -*- coding: utf-8 -*-

# jp: a SAT command line tool
# Copyright (C) 2009-2019 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from sat_frontends.jp import base
from sat_frontends.jp.constants import Const as C
from sat.core.i18n import _
from functools import partial
from sat.tools.common import data_format
from sat_frontends.jp import xmlui_manager

__commands__ = ["Encryption"]


class EncryptionAlgorithms(base.CommandBase):

    def __init__(self, host):
        extra_outputs = {"default": self.default_output}
        super(EncryptionAlgorithms, self).__init__(
            host, "algorithms",
            use_output=C.OUTPUT_LIST_DICT,
            extra_outputs=extra_outputs,
            use_profile=False,
            help=_("show available encryption algorithms"))
        self.need_loop = True

    def add_parser_options(self):
        pass

    def encryptionPluginsGetCb(self, plugins):
        self.output(plugins)
        self.host.quit()

    def default_output(self, plugins):
        if not plugins:
            self.disp(_("No encryption plugin registered!"))
            self.host.quit(C.EXIT_NOT_FOUND)
        else:
            self.disp(_("Following encryption algorithms are available: {algos}").format(
                algos=', '.join([p['name'] for p in plugins])))
            self.host.quit()

    def start(self):
        self.host.bridge.encryptionPluginsGet(
            callback=self.encryptionPluginsGetCb,
            errback=partial(
                self.errback,
                msg=_("can't retrieve plugins: {}"),
                exit_code=C.EXIT_BRIDGE_ERRBACK,
            ),
        )


class EncryptionGet(base.CommandBase):

    def __init__(self, host):
        super(EncryptionGet, self).__init__(
            host, "get",
            use_output=C.OUTPUT_DICT,
            help=_("get encryption session data"))
        self.need_loop = True

    def add_parser_options(self):
        self.parser.add_argument(
            "jid",
            help=_("jid of the entity to check")
        )

    def messageEncryptionGetCb(self, serialised):
        session_data = data_format.deserialise(serialised)
        if session_data is None:
            self.disp(
                "No encryption session found, the messages are sent in plain text.")
            self.host.quit(C.EXIT_NOT_FOUND)
        self.output(session_data)
        self.host.quit()

    def start(self):
        jids = self.host.check_jids([self.args.jid])
        jid = jids[0]
        self.host.bridge.messageEncryptionGet(
            jid, self.profile,
            callback=self.messageEncryptionGetCb,
            errback=partial(
                self.errback,
                msg=_("can't get session: {}"),
                exit_code=C.EXIT_BRIDGE_ERRBACK,
            ),
        )


class EncryptionStart(base.CommandBase):

    def __init__(self, host):
        super(EncryptionStart, self).__init__(
            host, "start",
            help=_("start encrypted session with an entity"))
        self.need_loop = True

    def add_parser_options(self):
        self.parser.add_argument(
            "--encrypt-noreplace",
            action="store_true",
            help=_("don't replace encryption algorithm if an other one is already used"))
        algorithm = self.parser.add_mutually_exclusive_group()
        algorithm.add_argument(
            "-n", "--name", help=_("algorithm name (DEFAULT: choose automatically)"))
        algorithm.add_argument(
            "-N", "--namespace",
            help=_("algorithm namespace (DEFAULT: choose automatically)"))
        self.parser.add_argument(
            "jid",
            help=_("jid of the entity to stop encrypted session with")
        )

    def encryptionNamespaceGetCb(self, namespace):
        jids = self.host.check_jids([self.args.jid])
        jid = jids[0]
        self.host.bridge.messageEncryptionStart(
            jid, namespace, not self.args.encrypt_noreplace,
            self.profile,
            callback=self.host.quit,
            errback=partial(self.errback,
                            msg=_("Can't start encryption session: {}"),
                            exit_code=C.EXIT_BRIDGE_ERRBACK,
                            ))

    def start(self):
        if self.args.name is not None:
            self.host.bridge.encryptionNamespaceGet(self.args.name,
                callback=self.encryptionNamespaceGetCb,
                errback=partial(self.errback,
                                msg=_("Can't get encryption namespace: {}"),
                                exit_code=C.EXIT_BRIDGE_ERRBACK,
                                ))
        elif self.args.namespace is not None:
            self.encryptionNamespaceGetCb(self.args.namespace)
        else:
            self.encryptionNamespaceGetCb("")


class EncryptionStop(base.CommandBase):

    def __init__(self, host):
        super(EncryptionStop, self).__init__(
            host, "stop",
            help=_("stop encrypted session with an entity"))
        self.need_loop = True

    def add_parser_options(self):
        self.parser.add_argument(
            "jid",
            help=_("jid of the entity to stop encrypted session with")
        )

    def start(self):
        jids = self.host.check_jids([self.args.jid])
        jid = jids[0]
        self.host.bridge.messageEncryptionStop(
            jid, self.profile,
            callback=self.host.quit,
            errback=partial(
                self.errback,
                msg=_("can't end encrypted session: {}"),
                exit_code=C.EXIT_BRIDGE_ERRBACK,
            ),
        )


class TrustUI(base.CommandBase):

    def __init__(self, host):
        super(TrustUI, self).__init__(
            host, "ui",
            help=_("get UI to manage trust"))
        self.need_loop = True

    def add_parser_options(self):
        self.parser.add_argument(
            "jid",
            help=_("jid of the entity to stop encrypted session with")
        )
        algorithm = self.parser.add_mutually_exclusive_group()
        algorithm.add_argument(
            "-n", "--name", help=_("algorithm name (DEFAULT: current algorithm)"))
        algorithm.add_argument(
            "-N", "--namespace",
            help=_("algorithm namespace (DEFAULT: current algorithm)"))

    def encryptionTrustUIGetCb(self, xmlui_raw):
        xmlui = xmlui_manager.create(self.host, xmlui_raw)
        xmlui.show()
        xmlui.submitForm()

    def encryptionNamespaceGetCb(self, namespace):
        jids = self.host.check_jids([self.args.jid])
        jid = jids[0]
        self.host.bridge.encryptionTrustUIGet(
            jid, namespace, self.profile,
            callback=self.encryptionTrustUIGetCb,
            errback=partial(
                self.errback,
                msg=_("can't end encrypted session: {}"),
                exit_code=C.EXIT_BRIDGE_ERRBACK,
            ),
        )

    def start(self):
        if self.args.name is not None:
            self.host.bridge.encryptionNamespaceGet(self.args.name,
                callback=self.encryptionNamespaceGetCb,
                errback=partial(self.errback,
                                msg=_("Can't get encryption namespace: {}"),
                                exit_code=C.EXIT_BRIDGE_ERRBACK,
                                ))
        elif self.args.namespace is not None:
            self.encryptionNamespaceGetCb(self.args.namespace)
        else:
            self.encryptionNamespaceGetCb("")


class EncryptionTrust(base.CommandBase):
    subcommands = (TrustUI,)

    def __init__(self, host):
        super(EncryptionTrust, self).__init__(
            host, "trust", use_profile=False, help=_("trust manangement")
        )


class Encryption(base.CommandBase):
    subcommands = (EncryptionAlgorithms, EncryptionGet, EncryptionStart, EncryptionStop,
                   EncryptionTrust)

    def __init__(self, host):
        super(Encryption, self).__init__(
            host, "encryption", use_profile=False, help=_("encryption sessions handling")
        )