Mercurial > libervia-backend
view sat/memory/encryption.py @ 2647:1bf7e89fded0
plugin XEP-0060: added singleton ID ("current")
author | Goffi <goffi@goffi.org> |
---|---|
date | Sun, 29 Jul 2018 19:23:01 +0200 |
parents | 712cb4ff3e13 |
children | ebcff5423465 |
line wrap: on
line source
#!/usr/bin/env python2 # -*- coding: utf-8 -*- # SAT: a jabber client # Copyright (C) 2009-2018 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from sat.core.i18n import _ from sat.core.constants import Const as C from sat.core import exceptions from collections import namedtuple from sat.core.log import getLogger log = getLogger(__name__) EncryptionPlugin = namedtuple("EncryptionPlugin", ("instance", "name", "namespace", "priority")) class EncryptionHandler(object): """Class to handle encryption sessions for a client""" plugins = [] # plugin able to encrypt messages def __init__(self, host): self._sessions = {} # bare_jid ==> encryption_data @classmethod def registerPlugin(cls, plg_instance, name, namespace, priority=0): """Register a plugin handling an encryption algorithm @param plg_instance(object): instance of the plugin it must have the following methods: - startEncryption(jid.JID): start an encryption session with a bare jid - stopEncryption(jid.JID): stop an encryption session with a bare jid @param name(unicode): human readable name of the encryption alrgorithm @param namespace(unicode): namespace of the encryption algorithm @param priority(int): priority of this plugin to encrypt an message when not selected manually """ existing_ns = [p.namespace for p in cls.plugins] if namespace in existing_ns: raise exceptions.ConflictError("A plugin with this namespace already exists!") plg = EncryptionPlugin( instance=plg_instance, name=name, namespace=namespace, priority=priority) cls.plugins.append(plg) cls.plugins.sort(key=lambda p: p.priority) def start(self, entity, namespace=None): """Start an encrypted session with an entity @param entity(jid.JID): entity to start an encrypted session with must be bare jid is the algorithm encrypt for all devices @param namespace(unicode, None): namespace of the encryption algorithm to use None to select automatically an algorithm """ if not self.plugins: raise exceptions.NotFound(_(u"No encryption plugin is registered, " u"an encryption session can't be started")) bare_jid = entity.userhostJID() if bare_jid in self._sessions: plg = self._sessions[bare_jid]['plugin'] msg = (_(u"Session with {bare_jid} is already encrypted with {name}." u"Please stop encryption session before changing algorithm.") .format(bare_jid=bare_jid, name=plg.name)) log.warning(msg) raise exceptions.ConflictError(msg) if namespace is None: plg = self.plugins[0] else: try: plg = next(p for p in self.plugins if p.namespace == namespace) except StopIteration: raise exceptions.NotFound(_( u"Can't find requested encryption plugin: {namespace}").format( namespace=namespace)) data = {"plugin": plg} if entity.resource: # indicate that we encrypt only for some devices data['directed_devices'] = [entity.resource] self._sessions[entity.userhostJID()] = data log.info(_(u"Encryption session as been set for {bare_jid} with " u"{encryption_name}").format( bare_jid=bare_jid.userhost(), encryption_name=plg.name)) ## Triggers ## def setEncryptionFlag(self, mess_data): """Set "encryption" key in mess_data if session with destinee is encrypted""" if mess_data["type"] == "groupchat": # FIXME: to change when group chat encryption will be handled return to_jid = mess_data['to'] encryption = self._sessions.get(to_jid.userhostJID()) if encryption is not None: mess_data[C.MESS_KEY_ENCRYPTION] = encryption