comparison libervia/server/server.py @ 1274:eb4f03da0d7d

server: re-usable Twisted TLS code has been moved to SàT backend
author Goffi <goffi@goffi.org>
date Fri, 29 May 2020 21:56:42 +0200
parents e628724530ec
children 334d044f2713
comparison
equal deleted inserted replaced
1273:10748aa888a9 1274:eb4f03da0d7d
50 from sat.tools.common import regex 50 from sat.tools.common import regex
51 from sat.tools.common import template 51 from sat.tools.common import template
52 from sat.tools.common import uri as common_uri 52 from sat.tools.common import uri as common_uri
53 from sat.tools.common.utils import recursive_update 53 from sat.tools.common.utils import recursive_update
54 from sat.tools.common import data_format 54 from sat.tools.common import data_format
55 from sat.tools.common import tls
55 import libervia 56 import libervia
56 from libervia.server import websockets 57 from libervia.server import websockets
57 from libervia.server.pages import LiberviaPage 58 from libervia.server.pages import LiberviaPage
58 from libervia.server.utils import quote, ProgressHandler 59 from libervia.server.utils import quote, ProgressHandler
59 from libervia.server.tasks.manager import TasksManager 60 from libervia.server.tasks.manager import TasksManager
60 from functools import partial 61 from functools import partial
61
62 try:
63 import OpenSSL
64 from twisted.internet import ssl
65 except ImportError:
66 ssl = None
67 62
68 from libervia.server.constants import Const as C 63 from libervia.server.constants import Const as C
69 from libervia.server import session_iface 64 from libervia.server import session_iface
70 65
71 log = getLogger(__name__) 66 log = getLogger(__name__)
1540 fmt_date = "{day_name}, %d {month_name} %Y %H:%M:%S GMT".format( 1535 fmt_date = "{day_name}, %d {month_name} %Y %H:%M:%S GMT".format(
1541 day_name=C.HTTP_DAYS[now.tm_wday], month_name=C.HTTP_MONTH[now.tm_mon - 1] 1536 day_name=C.HTTP_DAYS[now.tm_wday], month_name=C.HTTP_MONTH[now.tm_mon - 1]
1542 ) 1537 )
1543 return time.strftime(fmt_date, now) 1538 return time.strftime(fmt_date, now)
1544 1539
1545 ## TLS related methods ##
1546
1547 def _TLSOptionsCheck(self):
1548 """Check options coherence if TLS is activated, and update missing values
1549
1550 Must be called only if TLS is activated
1551 """
1552 if not self.options["tls_certificate"]:
1553 log.error("a TLS certificate is needed to activate HTTPS connection")
1554 self.quit(1)
1555 if not self.options["tls_private_key"]:
1556 self.options["tls_private_key"] = self.options["tls_certificate"]
1557
1558 if not self.options["tls_private_key"]:
1559 self.options["tls_private_key"] = self.options["tls_certificate"]
1560
1561 def _loadCertificates(self, f):
1562 """Read a .pem file with a list of certificates
1563
1564 @param f (file): file obj (opened .pem file)
1565 @return (list[OpenSSL.crypto.X509]): list of certificates
1566 @raise OpenSSL.crypto.Error: error while parsing the file
1567 """
1568 # XXX: didn't found any method to load a .pem file with several certificates
1569 # so the certificates split is done here
1570 certificates = []
1571 buf = []
1572 while True:
1573 line = f.readline()
1574 buf.append(line)
1575 if "-----END CERTIFICATE-----" in line:
1576 certificates.append(
1577 OpenSSL.crypto.load_certificate(
1578 OpenSSL.crypto.FILETYPE_PEM, "".join(buf)
1579 )
1580 )
1581 buf = []
1582 elif not line:
1583 log.debug("{} certificate(s) found".format(len(certificates)))
1584 return certificates
1585
1586 def _loadPKey(self, f):
1587 """Read a private key from a .pem file
1588
1589 @param f (file): file obj (opened .pem file)
1590 @return (list[OpenSSL.crypto.PKey]): private key object
1591 @raise OpenSSL.crypto.Error: error while parsing the file
1592 """
1593 return OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, f.read())
1594
1595 def _loadCertificate(self, f):
1596 """Read a public certificate from a .pem file
1597
1598 @param f (file): file obj (opened .pem file)
1599 @return (list[OpenSSL.crypto.X509]): public certificate
1600 @raise OpenSSL.crypto.Error: error while parsing the file
1601 """
1602 return OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, f.read())
1603
1604 def _getTLSContextFactory(self):
1605 """Load TLS certificate and build the context factory needed for listenSSL"""
1606 if ssl is None:
1607 raise ImportError("Python module pyOpenSSL is not installed!")
1608
1609 cert_options = {}
1610
1611 for name, option, method in [
1612 ("privateKey", "tls_private_key", self._loadPKey),
1613 ("certificate", "tls_certificate", self._loadCertificate),
1614 ("extraCertChain", "tls_chain", self._loadCertificates),
1615 ]:
1616 path = self.options[option]
1617 if not path:
1618 assert option == "tls_chain"
1619 continue
1620 log.debug("loading {option} from {path}".format(option=option, path=path))
1621 try:
1622 with open(path) as f:
1623 cert_options[name] = method(f)
1624 except IOError as e:
1625 log.error(
1626 "Error while reading file {path} for option {option}: {error}".format(
1627 path=path, option=option, error=e
1628 )
1629 )
1630 self.quit(2)
1631 except OpenSSL.crypto.Error:
1632 log.error(
1633 "Error while parsing file {path} for option {option}, are you sure "
1634 "it is a valid .pem file?".format( path=path, option=option))
1635 if (
1636 option == "tls_private_key"
1637 and self.options["tls_certificate"] == path
1638 ):
1639 log.error(
1640 "You are using the same file for private key and public "
1641 "certificate, make sure that both a in {path} or use "
1642 "--tls_private_key option".format(path=path))
1643 self.quit(2)
1644
1645 return ssl.CertificateOptions(**cert_options)
1646
1647 ## service management ## 1540 ## service management ##
1648 1541
1649 def _startService(self, __=None): 1542 def _startService(self, __=None):
1650 """Actually start the HTTP(S) server(s) after the profile for Libervia is connected. 1543 """Actually start the HTTP(S) server(s) after the profile for Libervia is connected.
1651 1544
1660 ProtectedFile(cache_dir)) 1553 ProtectedFile(cache_dir))
1661 self.service_cache_url = "/" + os.path.join(C.CACHE_DIR, service_path) 1554 self.service_cache_url = "/" + os.path.join(C.CACHE_DIR, service_path)
1662 session_iface.SATSession.service_cache_url = self.service_cache_url 1555 session_iface.SATSession.service_cache_url = self.service_cache_url
1663 1556
1664 if self.options["connection_type"] in ("https", "both"): 1557 if self.options["connection_type"] in ("https", "both"):
1665 self._TLSOptionsCheck() 1558 try:
1666 context_factory = self._getTLSContextFactory() 1559 tls.TLSOptionsCheck(self.options)
1560 context_factory = tls.getTLSContextFactory(self.options)
1561 except exceptions.ConfigError as e:
1562 log.warning(
1563 f"There is a problem in TLS settings in your configuration file: {e}")
1564 self.quit(2)
1565 except exceptions.DataError as e:
1566 log.warning(
1567 f"Can't set TLS: {e}")
1568 self.quit(1)
1667 reactor.listenSSL(self.options["port_https"], self.site, context_factory) 1569 reactor.listenSSL(self.options["port_https"], self.site, context_factory)
1668 if self.options["connection_type"] in ("http", "both"): 1570 if self.options["connection_type"] in ("http", "both"):
1669 if ( 1571 if (
1670 self.options["connection_type"] == "both" 1572 self.options["connection_type"] == "both"
1671 and self.options["redirect_to_https"] 1573 and self.options["redirect_to_https"]