changeset 1274:eb4f03da0d7d

server: re-usable Twisted TLS code has been moved to SàT backend
author Goffi <goffi@goffi.org>
date Fri, 29 May 2020 21:56:42 +0200
parents 10748aa888a9
children 334d044f2713
files libervia/server/server.py
diffstat 1 files changed, 12 insertions(+), 110 deletions(-) [+]
line wrap: on
line diff
--- a/libervia/server/server.py	Tue May 26 12:33:09 2020 +0200
+++ b/libervia/server/server.py	Fri May 29 21:56:42 2020 +0200
@@ -52,6 +52,7 @@
 from sat.tools.common import uri as common_uri
 from sat.tools.common.utils import recursive_update
 from sat.tools.common import data_format
+from sat.tools.common import tls
 import libervia
 from libervia.server import websockets
 from libervia.server.pages import LiberviaPage
@@ -59,12 +60,6 @@
 from libervia.server.tasks.manager import TasksManager
 from functools import partial
 
-try:
-    import OpenSSL
-    from twisted.internet import ssl
-except ImportError:
-    ssl = None
-
 from libervia.server.constants import Const as C
 from libervia.server import session_iface
 
@@ -1542,108 +1537,6 @@
         )
         return time.strftime(fmt_date, now)
 
-    ## TLS related methods ##
-
-    def _TLSOptionsCheck(self):
-        """Check options coherence if TLS is activated, and update missing values
-
-        Must be called only if TLS is activated
-        """
-        if not self.options["tls_certificate"]:
-            log.error("a TLS certificate is needed to activate HTTPS connection")
-            self.quit(1)
-        if not self.options["tls_private_key"]:
-            self.options["tls_private_key"] = self.options["tls_certificate"]
-
-        if not self.options["tls_private_key"]:
-            self.options["tls_private_key"] = self.options["tls_certificate"]
-
-    def _loadCertificates(self, f):
-        """Read a .pem file with a list of certificates
-
-        @param f (file): file obj (opened .pem file)
-        @return (list[OpenSSL.crypto.X509]): list of certificates
-        @raise OpenSSL.crypto.Error: error while parsing the file
-        """
-        # XXX: didn't found any method to load a .pem file with several certificates
-        #      so the certificates split is done here
-        certificates = []
-        buf = []
-        while True:
-            line = f.readline()
-            buf.append(line)
-            if "-----END CERTIFICATE-----" in line:
-                certificates.append(
-                    OpenSSL.crypto.load_certificate(
-                        OpenSSL.crypto.FILETYPE_PEM, "".join(buf)
-                    )
-                )
-                buf = []
-            elif not line:
-                log.debug("{} certificate(s) found".format(len(certificates)))
-                return certificates
-
-    def _loadPKey(self, f):
-        """Read a private key from a .pem file
-
-        @param f (file): file obj (opened .pem file)
-        @return (list[OpenSSL.crypto.PKey]): private key object
-        @raise OpenSSL.crypto.Error: error while parsing the file
-        """
-        return OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, f.read())
-
-    def _loadCertificate(self, f):
-        """Read a public certificate from a .pem file
-
-        @param f (file): file obj (opened .pem file)
-        @return (list[OpenSSL.crypto.X509]): public certificate
-        @raise OpenSSL.crypto.Error: error while parsing the file
-        """
-        return OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, f.read())
-
-    def _getTLSContextFactory(self):
-        """Load TLS certificate and build the context factory needed for listenSSL"""
-        if ssl is None:
-            raise ImportError("Python module pyOpenSSL is not installed!")
-
-        cert_options = {}
-
-        for name, option, method in [
-            ("privateKey", "tls_private_key", self._loadPKey),
-            ("certificate", "tls_certificate", self._loadCertificate),
-            ("extraCertChain", "tls_chain", self._loadCertificates),
-        ]:
-            path = self.options[option]
-            if not path:
-                assert option == "tls_chain"
-                continue
-            log.debug("loading {option} from {path}".format(option=option, path=path))
-            try:
-                with open(path) as f:
-                    cert_options[name] = method(f)
-            except IOError as e:
-                log.error(
-                    "Error while reading file {path} for option {option}: {error}".format(
-                        path=path, option=option, error=e
-                    )
-                )
-                self.quit(2)
-            except OpenSSL.crypto.Error:
-                log.error(
-                    "Error while parsing file {path} for option {option}, are you sure "
-                    "it is a valid .pem file?".format( path=path, option=option))
-                if (
-                    option == "tls_private_key"
-                    and self.options["tls_certificate"] == path
-                ):
-                    log.error(
-                        "You are using the same file for private key and public "
-                        "certificate, make sure that both a in {path} or use "
-                        "--tls_private_key option".format(path=path))
-                self.quit(2)
-
-        return ssl.CertificateOptions(**cert_options)
-
     ## service management ##
 
     def _startService(self, __=None):
@@ -1662,8 +1555,17 @@
         session_iface.SATSession.service_cache_url = self.service_cache_url
 
         if self.options["connection_type"] in ("https", "both"):
-            self._TLSOptionsCheck()
-            context_factory = self._getTLSContextFactory()
+            try:
+                tls.TLSOptionsCheck(self.options)
+                context_factory = tls.getTLSContextFactory(self.options)
+            except exceptions.ConfigError as e:
+                log.warning(
+                    f"There is a problem in TLS settings in your configuration file: {e}")
+                self.quit(2)
+            except exceptions.DataError as e:
+                log.warning(
+                    f"Can't set TLS: {e}")
+                self.quit(1)
             reactor.listenSSL(self.options["port_https"], self.site, context_factory)
         if self.options["connection_type"] in ("http", "both"):
             if (