view sat/memory/encryption.py @ 2648:0f76813afc57

plugin XEP-0384: OMEMO implementation first draft: this is the initial implementation of OMEMO encryption using python omemo module. /!\ This implementation is not yet working /!\
author Goffi <goffi@goffi.org>
date Sun, 29 Jul 2018 19:24:21 +0200
parents 712cb4ff3e13
children ebcff5423465
line wrap: on
line source

#!/usr/bin/env python2
# -*- coding: utf-8 -*-

# SAT: a jabber client
# Copyright (C) 2009-2018 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from sat.core.i18n import _
from sat.core.constants import Const as C
from sat.core import exceptions
from collections import namedtuple
from sat.core.log import getLogger
log = getLogger(__name__)


EncryptionPlugin = namedtuple("EncryptionPlugin", ("instance",
                                                   "name",
                                                   "namespace",
                                                   "priority"))


class EncryptionHandler(object):
    """Class to handle encryption sessions for a client"""
    plugins = []  # plugin able to encrypt messages

    def __init__(self, host):
        self._sessions = {}  # bare_jid ==> encryption_data

    @classmethod
    def registerPlugin(cls, plg_instance, name, namespace, priority=0):
        """Register a plugin handling an encryption algorithm

        @param plg_instance(object): instance of the plugin
            it must have the following methods:
                - startEncryption(jid.JID): start an encryption session with a bare jid
                - stopEncryption(jid.JID): stop an encryption session with a bare jid
        @param name(unicode): human readable name of the encryption alrgorithm
        @param namespace(unicode): namespace of the encryption algorithm
        @param priority(int): priority of this plugin to encrypt an message when not
            selected manually
        """
        existing_ns = [p.namespace for p in cls.plugins]
        if namespace in existing_ns:
            raise exceptions.ConflictError("A plugin with this namespace already exists!")
        plg = EncryptionPlugin(
            instance=plg_instance,
            name=name,
            namespace=namespace,
            priority=priority)
        cls.plugins.append(plg)
        cls.plugins.sort(key=lambda p: p.priority)

    def start(self, entity, namespace=None):
        """Start an encrypted session with an entity

        @param entity(jid.JID): entity to start an encrypted session with
            must be bare jid is the algorithm encrypt for all devices
        @param namespace(unicode, None): namespace of the encryption algorithm to use
            None to select automatically an algorithm
        """
        if not self.plugins:
            raise exceptions.NotFound(_(u"No encryption plugin is registered, "
                                        u"an encryption session can't be started"))

        bare_jid = entity.userhostJID()
        if bare_jid in self._sessions:
            plg = self._sessions[bare_jid]['plugin']

            msg = (_(u"Session with {bare_jid} is already encrypted with {name}."
                     u"Please stop encryption session before changing algorithm.")
                   .format(bare_jid=bare_jid, name=plg.name))
            log.warning(msg)
            raise exceptions.ConflictError(msg)

        if namespace is None:
            plg = self.plugins[0]
        else:
            try:
                plg = next(p for p in self.plugins if p.namespace == namespace)
            except StopIteration:
                raise exceptions.NotFound(_(
                    u"Can't find requested encryption plugin: {namespace}").format(
                        namespace=namespace))

        data = {"plugin": plg}
        if entity.resource:
            # indicate that we encrypt only for some devices
            data['directed_devices'] = [entity.resource]

        self._sessions[entity.userhostJID()] = data
        log.info(_(u"Encryption session as been set for {bare_jid} with "
                   u"{encryption_name}").format(
                   bare_jid=bare_jid.userhost(), encryption_name=plg.name))

    ## Triggers ##

    def setEncryptionFlag(self, mess_data):
        """Set "encryption" key in mess_data if session with destinee is encrypted"""

        if mess_data["type"] == "groupchat":
            # FIXME: to change when group chat encryption will be handled
            return

        to_jid = mess_data['to']
        encryption = self._sessions.get(to_jid.userhostJID())
        if encryption is not None:
            mess_data[C.MESS_KEY_ENCRYPTION] = encryption