diff sat/memory/crypto.py @ 3160:330a5f1d9eea

core (memory/crypto): replaced `PyCrypto` by `cryptography`: `PyCrypto` is unmaintained for years but was used in SàT for password hashing. This patch fixes that by replacing `PyCrypto` by the reference `cryptography` module which is well maintained. The behaviour stays the same (except that previously async `hash`, `encrypt` and `decrypt` methods are now synchronous, as they are quick and using a deferToThread may actually be more resource intensive than using blocking methods). It is planed to improve `memory.crypto` by using more up-to-date cryptography/hashing algorithms in the future. PyCrypto is no more a dependency of SàT
author Goffi <goffi@goffi.org>
date Sun, 09 Feb 2020 23:50:26 +0100
parents 559a625a236b
children be6d91572633
line wrap: on
line diff
--- a/sat/memory/crypto.py	Sun Feb 09 23:50:21 2020 +0100
+++ b/sat/memory/crypto.py	Sun Feb 09 23:50:26 2020 +0100
@@ -1,6 +1,5 @@
 #!/usr/bin/env python3
 
-
 # SAT: a jabber client
 # Copyright (C) 2009-2020 Jérôme Poisson (goffi@goffi.org)
 # Copyright (C) 2013-2016 Adrien Cossa (souliane@mailoo.org)
@@ -18,26 +17,25 @@
 # You should have received a copy of the GNU Affero General Public License
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
-try:
-    from Crypto.Cipher import AES
-    from Crypto.Protocol.KDF import PBKDF2
-except ImportError:
-    raise Exception("PyCrypto is not installed.")
-
 from os import urandom
 from base64 import b64encode, b64decode
-from twisted.internet.threads import deferToThread
-from twisted.internet.defer import succeed
+from cryptography.hazmat.primitives import hashes
+from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
+from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
+from cryptography.hazmat.backends import default_backend
+
+
+crypto_backend = default_backend()
 
 
-class BlockCipher(object):
+class BlockCipher:
 
-    BLOCK_SIZE = AES.block_size  # 16 bits
-    MAX_KEY_SIZE = AES.key_size[-1]  # 32 bits = AES-256
+    BLOCK_SIZE = 16
+    MAX_KEY_SIZE = 32
     IV_SIZE = BLOCK_SIZE  # initialization vector size, 16 bits
 
-    @classmethod
-    def encrypt(cls, key, text, leave_empty=True):
+    @staticmethod
+    def encrypt(key, text, leave_empty=True):
         """Encrypt a message.
 
         Based on http://stackoverflow.com/a/12525165
@@ -48,22 +46,22 @@
         @return (D(str)): base-64 encoded encrypted message
         """
         if leave_empty and text == "":
-            return succeed(text)
+            return ""
         iv = BlockCipher.getRandomKey()
-        key = key.encode("utf-8")
+        key = key.encode()
         key = (
             key[: BlockCipher.MAX_KEY_SIZE]
             if len(key) >= BlockCipher.MAX_KEY_SIZE
             else BlockCipher.pad(key)
         )
-        cipher = AES.new(key, AES.MODE_CFB, iv)
-        d = deferToThread(cipher.encrypt, BlockCipher.pad(text.encode("utf-8")))
-        d.addCallback(lambda ciphertext: b64encode(iv + ciphertext))
-        d.addCallback(lambda bytes_cypher: bytes_cypher.decode('utf-8'))
-        return d
 
-    @classmethod
-    def decrypt(cls, key, ciphertext, leave_empty=True):
+        cipher = Cipher(algorithms.AES(key), modes.CFB8(iv), backend=crypto_backend)
+        encryptor = cipher.encryptor()
+        encrypted = encryptor.update(BlockCipher.pad(text.encode())) + encryptor.finalize()
+        return b64encode(iv + encrypted).decode()
+
+    @staticmethod
+    def decrypt(key, ciphertext, leave_empty=True):
         """Decrypt a message.
 
         Based on http://stackoverflow.com/a/12525165
@@ -74,30 +72,26 @@
         @return: Deferred: str or None if the password could not be decrypted
         """
         if leave_empty and ciphertext == "":
-            return succeed("")
+            return ""
         ciphertext = b64decode(ciphertext)
         iv, ciphertext = (
             ciphertext[: BlockCipher.IV_SIZE],
             ciphertext[BlockCipher.IV_SIZE :],
         )
-        key = key.encode("utf-8")
+        key = key.encode()
         key = (
             key[: BlockCipher.MAX_KEY_SIZE]
             if len(key) >= BlockCipher.MAX_KEY_SIZE
             else BlockCipher.pad(key)
         )
-        cipher = AES.new(key, AES.MODE_CFB, iv)
-        d = deferToThread(cipher.decrypt, ciphertext)
-        d.addCallback(lambda text: BlockCipher.unpad(text))
-        # XXX: cipher.decrypt gives no way to make the distinction between
-        # a decrypted empty value and a decryption failure... both return
-        # the empty value. Fortunately, we detect empty passwords beforehand
-        # thanks to the "leave_empty" parameter which is used by default.
-        d.addCallback(lambda text: text if text else None)
-        return d
 
-    @classmethod
-    def getRandomKey(cls, size=None, base64=False):
+        cipher = Cipher(algorithms.AES(key), modes.CFB8(iv), backend=crypto_backend)
+        decryptor = cipher.decryptor()
+        decrypted = decryptor.update(ciphertext) + decryptor.finalize()
+        return BlockCipher.unpad(decrypted)
+
+    @staticmethod
+    def getRandomKey(size=None, base64=False):
         """Return a random key suitable for block cipher encryption.
 
         Note: a good value for the key length is to make it as long as the block size.
@@ -111,25 +105,25 @@
         key = urandom(size)
         return b64encode(key) if base64 else key
 
-    @classmethod
-    def pad(self, s):
+    @staticmethod
+    def pad(s):
         """Method from http://stackoverflow.com/a/12525165"""
         bs = BlockCipher.BLOCK_SIZE
-        return s + (bs - len(s) % bs) * (chr(bs - len(s) % bs)).encode('utf-8')
+        return s + (bs - len(s) % bs) * (chr(bs - len(s) % bs)).encode()
 
-    @classmethod
-    def unpad(self, s):
+    @staticmethod
+    def unpad(s):
         """Method from http://stackoverflow.com/a/12525165"""
-        s = s.decode('utf-8')
+        s = s.decode()
         return s[0 : -ord(s[-1])]
 
 
-class PasswordHasher(object):
+class PasswordHasher:
 
     SALT_LEN = 16  # 128 bits
 
-    @classmethod
-    def hash(cls, password, salt=None, leave_empty=True):
+    @staticmethod
+    def hash(password, salt=None, leave_empty=True):
         """Hash a password.
 
         @param password (str): the password to hash
@@ -138,33 +132,39 @@
         @return: Deferred: base-64 encoded str
         """
         if leave_empty and password == "":
-            return succeed("")
+            return ""
         salt = (
             b64decode(salt)[: PasswordHasher.SALT_LEN]
             if salt
             else urandom(PasswordHasher.SALT_LEN)
         )
-        d = deferToThread(PBKDF2, password, salt)
-        d.addCallback(lambda hashed: b64encode(salt + hashed))
-        d.addCallback(lambda hashed_bytes: hashed_bytes.decode('utf-8'))
-        return d
 
-    @classmethod
-    def compare_hash(cls, hashed_attempt, hashed):
-        assert isinstance(hashed, str)
-        return hashed_attempt == hashed
+        # we use PyCrypto's PBKDF2 arguments while porting to crytography, to stay
+        # compatible with existing installations. But this is temporary and we need
+        # to update them to more secure values.
+        kdf = PBKDF2HMAC(
+            # FIXME: SHA1() is not secure, it is used here for historical reasons
+            #   and must be changed as soon as possible
+            algorithm=hashes.SHA1(),
+            length=16,
+            salt=salt,
+            iterations=1000,
+            backend=crypto_backend
+        )
+        key = kdf.derive(password.encode())
+        return b64encode(salt + key).decode()
 
-    @classmethod
-    def verify(cls, attempt, hashed):
+    @staticmethod
+    def verify(attempt, pwd_hash):
         """Verify a password attempt.
 
         @param attempt (str): the attempt to check
-        @param hashed (str): the hash of the password
+        @param pwd_hash (str): the hash of the password
         @return: Deferred: boolean
         """
         assert isinstance(attempt, str)
-        assert isinstance(hashed, str)
-        leave_empty = hashed == ""
-        d = PasswordHasher.hash(attempt, hashed, leave_empty)
-        d.addCallback(cls.compare_hash, hashed=hashed)
-        return d
+        assert isinstance(pwd_hash, str)
+        leave_empty = pwd_hash == ""
+        attempt_hash = PasswordHasher.hash(attempt, pwd_hash, leave_empty)
+        assert isinstance(attempt_hash, str)
+        return attempt_hash == pwd_hash