Mercurial > libervia-backend
diff libervia/backend/tools/common/tls.py @ 4071:4b842c1fb686
refactoring: renamed `sat` package to `libervia.backend`
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 02 Jun 2023 11:49:51 +0200 |
parents | sat/tools/common/tls.py@524856bd7b19 |
children | 0d7bb4df2343 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/libervia/backend/tools/common/tls.py Fri Jun 02 11:49:51 2023 +0200 @@ -0,0 +1,140 @@ +#!/usr/bin/env python3 + +# Libervia: an XMPP client +# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +"""TLS handling with twisted""" + +from libervia.backend.core.log import getLogger +from libervia.backend.core import exceptions +from libervia.backend.tools import config as tools_config + + +try: + import OpenSSL + from twisted.internet import ssl +except ImportError: + ssl = None + + +log = getLogger(__name__) + + +def get_options_from_config(config, section=""): + options = {} + for option in ('tls_certificate', 'tls_private_key', 'tls_chain'): + options[option] = tools_config.config_get(config, section, option) + return options + + +def tls_options_check(options): + """Check options coherence if TLS is activated, and update missing values + + Must be called only if TLS is activated + """ + if not options["tls_certificate"]: + raise exceptions.ConfigError( + "a TLS certificate is needed to activate HTTPS connection") + if not options["tls_private_key"]: + options["tls_private_key"] = options["tls_certificate"] + + +def load_certificates(f): + """Read a .pem file with a list of certificates + + @param f (file): file obj (opened .pem file) + @return (list[OpenSSL.crypto.X509]): list of certificates + @raise OpenSSL.crypto.Error: error while parsing the file + """ + # XXX: didn't found any method to load a .pem file with several certificates + # so the certificates split is done here + certificates = [] + buf = [] + while True: + line = f.readline() + buf.append(line) + if "-----END CERTIFICATE-----" in line: + certificates.append( + OpenSSL.crypto.load_certificate( + OpenSSL.crypto.FILETYPE_PEM, "".join(buf) + ) + ) + buf = [] + elif not line: + log.debug(f"{len(certificates)} certificate(s) found") + return certificates + + +def load_p_key(f): + """Read a private key from a .pem file + + @param f (file): file obj (opened .pem file) + @return (list[OpenSSL.crypto.PKey]): private key object + @raise OpenSSL.crypto.Error: error while parsing the file + """ + return OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, f.read()) + + +def load_certificate(f): + """Read a public certificate from a .pem file + + @param f (file): file obj (opened .pem file) + @return (list[OpenSSL.crypto.X509]): public certificate + @raise OpenSSL.crypto.Error: error while parsing the file + """ + return OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, f.read()) + + +def get_tls_context_factory(options): + """Load TLS certificate and build the context factory needed for listenSSL""" + if ssl is None: + raise ImportError("Python module pyOpenSSL is not installed!") + + cert_options = {} + + for name, option, method in [ + ("privateKey", "tls_private_key", load_p_key), + ("certificate", "tls_certificate", load_certificate), + ("extraCertChain", "tls_chain", load_certificates), + ]: + path = options[option] + if not path: + assert option == "tls_chain" + continue + log.debug(f"loading {option} from {path}") + try: + with open(path) as f: + cert_options[name] = method(f) + except IOError as e: + raise exceptions.DataError( + f"Error while reading file {path} for option {option}: {e}" + ) + except OpenSSL.crypto.Error: + raise exceptions.DataError( + f"Error while parsing file {path} for option {option}, are you sure " + f"it is a valid .pem file?" + ) + if ( + option == "tls_private_key" + and options["tls_certificate"] == path + ): + raise exceptions.ConfigError( + f"You are using the same file for private key and public " + f"certificate, make sure that both a in {path} or use " + f"--tls_private_key option" + ) + + return ssl.CertificateOptions(**cert_options)