# HG changeset patch # User Goffi # Date 1590782202 -7200 # Node ID eb4f03da0d7db624b092912cb7748e180d2d15da # Parent 10748aa888a9f1a0406108b0149ccdf4c80fdfad server: re-usable Twisted TLS code has been moved to SàT backend diff -r 10748aa888a9 -r eb4f03da0d7d libervia/server/server.py --- a/libervia/server/server.py Tue May 26 12:33:09 2020 +0200 +++ b/libervia/server/server.py Fri May 29 21:56:42 2020 +0200 @@ -52,6 +52,7 @@ from sat.tools.common import uri as common_uri from sat.tools.common.utils import recursive_update from sat.tools.common import data_format +from sat.tools.common import tls import libervia from libervia.server import websockets from libervia.server.pages import LiberviaPage @@ -59,12 +60,6 @@ from libervia.server.tasks.manager import TasksManager from functools import partial -try: - import OpenSSL - from twisted.internet import ssl -except ImportError: - ssl = None - from libervia.server.constants import Const as C from libervia.server import session_iface @@ -1542,108 +1537,6 @@ ) return time.strftime(fmt_date, now) - ## TLS related methods ## - - def _TLSOptionsCheck(self): - """Check options coherence if TLS is activated, and update missing values - - Must be called only if TLS is activated - """ - if not self.options["tls_certificate"]: - log.error("a TLS certificate is needed to activate HTTPS connection") - self.quit(1) - if not self.options["tls_private_key"]: - self.options["tls_private_key"] = self.options["tls_certificate"] - - if not self.options["tls_private_key"]: - self.options["tls_private_key"] = self.options["tls_certificate"] - - def _loadCertificates(self, f): - """Read a .pem file with a list of certificates - - @param f (file): file obj (opened .pem file) - @return (list[OpenSSL.crypto.X509]): list of certificates - @raise OpenSSL.crypto.Error: error while parsing the file - """ - # XXX: didn't found any method to load a .pem file with several certificates - # so the certificates split is done here - certificates = [] - buf = [] - while True: - line = f.readline() - buf.append(line) - if "-----END CERTIFICATE-----" in line: - certificates.append( - OpenSSL.crypto.load_certificate( - OpenSSL.crypto.FILETYPE_PEM, "".join(buf) - ) - ) - buf = [] - elif not line: - log.debug("{} certificate(s) found".format(len(certificates))) - return certificates - - def _loadPKey(self, f): - """Read a private key from a .pem file - - @param f (file): file obj (opened .pem file) - @return (list[OpenSSL.crypto.PKey]): private key object - @raise OpenSSL.crypto.Error: error while parsing the file - """ - return OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, f.read()) - - def _loadCertificate(self, f): - """Read a public certificate from a .pem file - - @param f (file): file obj (opened .pem file) - @return (list[OpenSSL.crypto.X509]): public certificate - @raise OpenSSL.crypto.Error: error while parsing the file - """ - return OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, f.read()) - - def _getTLSContextFactory(self): - """Load TLS certificate and build the context factory needed for listenSSL""" - if ssl is None: - raise ImportError("Python module pyOpenSSL is not installed!") - - cert_options = {} - - for name, option, method in [ - ("privateKey", "tls_private_key", self._loadPKey), - ("certificate", "tls_certificate", self._loadCertificate), - ("extraCertChain", "tls_chain", self._loadCertificates), - ]: - path = self.options[option] - if not path: - assert option == "tls_chain" - continue - log.debug("loading {option} from {path}".format(option=option, path=path)) - try: - with open(path) as f: - cert_options[name] = method(f) - except IOError as e: - log.error( - "Error while reading file {path} for option {option}: {error}".format( - path=path, option=option, error=e - ) - ) - self.quit(2) - except OpenSSL.crypto.Error: - log.error( - "Error while parsing file {path} for option {option}, are you sure " - "it is a valid .pem file?".format( path=path, option=option)) - if ( - option == "tls_private_key" - and self.options["tls_certificate"] == path - ): - log.error( - "You are using the same file for private key and public " - "certificate, make sure that both a in {path} or use " - "--tls_private_key option".format(path=path)) - self.quit(2) - - return ssl.CertificateOptions(**cert_options) - ## service management ## def _startService(self, __=None): @@ -1662,8 +1555,17 @@ session_iface.SATSession.service_cache_url = self.service_cache_url if self.options["connection_type"] in ("https", "both"): - self._TLSOptionsCheck() - context_factory = self._getTLSContextFactory() + try: + tls.TLSOptionsCheck(self.options) + context_factory = tls.getTLSContextFactory(self.options) + except exceptions.ConfigError as e: + log.warning( + f"There is a problem in TLS settings in your configuration file: {e}") + self.quit(2) + except exceptions.DataError as e: + log.warning( + f"Can't set TLS: {e}") + self.quit(1) reactor.listenSSL(self.options["port_https"], self.site, context_factory) if self.options["connection_type"] in ("http", "both"): if (