Mercurial > libervia-backend
view libervia/backend/tools/common/tls.py @ 4141:ba8ddfdd334f
cli (loops): run GLib loop in same thread as asyncio:
use the new `install_glib_asyncio_iteration` to run GLib in the same thread as asyncio.
rel 426
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 01 Nov 2023 14:05:53 +0100 |
parents | 4b842c1fb686 |
children | 0d7bb4df2343 |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia: an XMPP client # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. """TLS handling with twisted""" from libervia.backend.core.log import getLogger from libervia.backend.core import exceptions from libervia.backend.tools import config as tools_config try: import OpenSSL from twisted.internet import ssl except ImportError: ssl = None log = getLogger(__name__) def get_options_from_config(config, section=""): options = {} for option in ('tls_certificate', 'tls_private_key', 'tls_chain'): options[option] = tools_config.config_get(config, section, option) return options def tls_options_check(options): """Check options coherence if TLS is activated, and update missing values Must be called only if TLS is activated """ if not options["tls_certificate"]: raise exceptions.ConfigError( "a TLS certificate is needed to activate HTTPS connection") if not options["tls_private_key"]: options["tls_private_key"] = options["tls_certificate"] def load_certificates(f): """Read a .pem file with a list of certificates @param f (file): file obj (opened .pem file) @return (list[OpenSSL.crypto.X509]): list of certificates @raise OpenSSL.crypto.Error: error while parsing the file """ # XXX: didn't found any method to load a .pem file with several certificates # so the certificates split is done here certificates = [] buf = [] while True: line = f.readline() buf.append(line) if "-----END CERTIFICATE-----" in line: certificates.append( OpenSSL.crypto.load_certificate( OpenSSL.crypto.FILETYPE_PEM, "".join(buf) ) ) buf = [] elif not line: log.debug(f"{len(certificates)} certificate(s) found") return certificates def load_p_key(f): """Read a private key from a .pem file @param f (file): file obj (opened .pem file) @return (list[OpenSSL.crypto.PKey]): private key object @raise OpenSSL.crypto.Error: error while parsing the file """ return OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, f.read()) def load_certificate(f): """Read a public certificate from a .pem file @param f (file): file obj (opened .pem file) @return (list[OpenSSL.crypto.X509]): public certificate @raise OpenSSL.crypto.Error: error while parsing the file """ return OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, f.read()) def get_tls_context_factory(options): """Load TLS certificate and build the context factory needed for listenSSL""" if ssl is None: raise ImportError("Python module pyOpenSSL is not installed!") cert_options = {} for name, option, method in [ ("privateKey", "tls_private_key", load_p_key), ("certificate", "tls_certificate", load_certificate), ("extraCertChain", "tls_chain", load_certificates), ]: path = options[option] if not path: assert option == "tls_chain" continue log.debug(f"loading {option} from {path}") try: with open(path) as f: cert_options[name] = method(f) except IOError as e: raise exceptions.DataError( f"Error while reading file {path} for option {option}: {e}" ) except OpenSSL.crypto.Error: raise exceptions.DataError( f"Error while parsing file {path} for option {option}, are you sure " f"it is a valid .pem file?" ) if ( option == "tls_private_key" and options["tls_certificate"] == path ): raise exceptions.ConfigError( f"You are using the same file for private key and public " f"certificate, make sure that both a in {path} or use " f"--tls_private_key option" ) return ssl.CertificateOptions(**cert_options)