diff src/server/server.py @ 813:6e27604ec95a

server: added --tls_private_key and --tls_chain options. --tls_certificate .pem file will be used for private_key if --tls_private_key is not specified.
author Goffi <goffi@goffi.org>
date Sun, 20 Dec 2015 20:01:42 +0100
parents fd6965c16e7e
children cf1812a4445e
line wrap: on
line diff
--- a/src/server/server.py	Sun Dec 20 20:01:40 2015 +0100
+++ b/src/server/server.py	Sun Dec 20 20:01:42 2015 +0100
@@ -51,9 +51,8 @@
 try:
     import OpenSSL
     from twisted.internet import ssl
-    ssl_available = True
-except:
-    ssl_available = False
+except ImportError:
+    ssl = None
 
 from libervia.server.constants import Const as C
 from libervia.server.blog import MicroBlog
@@ -1343,25 +1342,104 @@
 
         self.initialised.addCallback(initOk)
 
+    ## TLS related methods ##
+
+    def _TLSOptionsCheck(self):
+        """Check options coherence if TLS is activated, and update missing values
+
+        Must be called only if TLS is activated
+        """
+        if not self.options['tls_certificate']:
+            log.error(u"a TLS certificate is needed to activate HTTPS connection")
+            self.quit(1)
+        if not self.options['tls_private_key']:
+            self.options['tls_private_key'] = self.options['tls_certificate']
+
+
+        if not self.options['tls_private_key']:
+            self.options['tls_private_key'] = self.options['tls_certificate']
+
+    def _loadCertificates(self, f):
+        """Read a .pem file with a list of certificates
+
+        @param f (file): file obj (opened .pem file)
+        @return (list[OpenSSL.crypto.X509]): list of certificates
+        @raise OpenSSL.crypto.Error: error while parsing the file
+        """
+        # XXX: didn't found any method to load a .pem file with several certificates
+        #      so the certificates split is done here
+        certificates = []
+        buf = []
+        while True:
+            line = f.readline()
+            buf.append(line)
+            if '-----END CERTIFICATE-----' in line:
+                certificates.append(OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, ''.join(buf)))
+                buf=[]
+            elif not line:
+                log.debug(u"{} certificate(s) found".format(len(certificates)))
+                return certificates
+
+    def _loadPKey(self, f):
+        """Read a private key from a .pem file
+
+        @param f (file): file obj (opened .pem file)
+        @return (list[OpenSSL.crypto.PKey]): private key object
+        @raise OpenSSL.crypto.Error: error while parsing the file
+        """
+        return OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, f.read())
+
+    def _loadCertificate(self, f):
+        """Read a public certificate from a .pem file
+
+        @param f (file): file obj (opened .pem file)
+        @return (list[OpenSSL.crypto.X509]): public certificate
+        @raise OpenSSL.crypto.Error: error while parsing the file
+        """
+        return OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, f.read())
+
+    def _getTLSContextFactory(self):
+        """Load TLS certificate and build the context factory needed for listenSSL"""
+        if ssl is None:
+            raise ImportError(u"Python module pyOpenSSL is not installed!")
+
+        cert_options = {}
+
+        for name, option, method in [('privateKey', 'tls_private_key', self._loadPKey),
+                                    ('certificate', 'tls_certificate', self._loadCertificate),
+                                    ('extraCertChain', 'tls_chain', self._loadCertificates)]:
+            path = self.options[option]
+            if not path:
+                assert option=='tls_chain'
+                continue
+            log.debug(u"loading {option} from {path}".format(option=option, path=path))
+            try:
+                with open(path) as f:
+                    cert_options[name] = method(f)
+            except IOError as e:
+                log.error(u"Error while reading file {path} for option {option}: {error}".format(path=path, option=option, error=e))
+                self.quit(2)
+            except OpenSSL.crypto.Error:
+                log.error(u"Error while parsing file {path} for option {option}, are you sure it is a valid .pem file?".format(path=path, option=option))
+                if option=='tls_private_key' and self.options['tls_certificate'] == path:
+                    log.error(u"You are using the same file for private key and public certificate, make sure that both a in {path} or use --tls_private_key option".format(path=path))
+                self.quit(2)
+
+        return ssl.CertificateOptions(**cert_options)
+
+    ## service management ##
+
     def _startService(self, dummy=None):
         """Actually start the HTTP(S) server(s) after the profile for Libervia is connected.
+
+        @raise ImportError: OpenSSL is not available
         @raise IOError: the certificate file doesn't exist
         @raise OpenSSL.crypto.Error: the certificate file is invalid
         """
         if self.options['connection_type'] in ('https', 'both'):
-            if ssl is None:
-                raise ImportError(u"Python module pyOpenSSL is not installed!")
-            try:
-                with open(os.path.expanduser(self.options['ssl_certificate'])) as keyAndCert:
-                    try:
-                        cert = ssl.PrivateCertificate.loadPEM(keyAndCert.read())
-                    except OpenSSL.crypto.Error as e:
-                        log.error(_(u"The file '%s' must contain both private and public parts of the certificate") % self.options['ssl_certificate'])
-                        raise e
-            except IOError as e:
-                log.error(_(u"The file '%s' doesn't exist") % self.options['ssl_certificate'])
-                raise e
-            reactor.listenSSL(self.options['port_https'], self.site, cert.options())
+            self._TLSOptionsCheck()
+            context_factory = self._getTLSContextFactory()
+            reactor.listenSSL(self.options['port_https'], self.site, context_factory)
         if self.options['connection_type'] in ('http', 'both'):
             if self.options['connection_type'] == 'both' and self.options['redirect_to_https']:
                 reactor.listenTCP(self.options['port'], server.Site(RedirectToHTTPS(self.options['port'], self.options['port_https_ext'])))