Mercurial > libervia-backend
comparison sat/tools/common/tls.py @ 3287:a4b8c9bcfb57
tools/common (tls): moved re-usable Twisted TLS code from Libervia to tools/common
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 29 May 2020 21:07:10 +0200 |
parents | |
children | 4dbf9fcbf26d |
comparison
equal
deleted
inserted
replaced
3286:ddf3ded93b78 | 3287:a4b8c9bcfb57 |
---|---|
1 #!/usr/bin/env python3 | |
2 | |
3 # SàT: a XMPP client | |
4 # Copyright (C) 2009-2020 Jérôme Poisson (goffi@goffi.org) | |
5 | |
6 # This program is free software: you can redistribute it and/or modify | |
7 # it under the terms of the GNU Affero General Public License as published by | |
8 # the Free Software Foundation, either version 3 of the License, or | |
9 # (at your option) any later version. | |
10 | |
11 # This program is distributed in the hope that it will be useful, | |
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
14 # GNU Affero General Public License for more details. | |
15 | |
16 # You should have received a copy of the GNU Affero General Public License | |
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
18 | |
19 """TLS handling with twisted""" | |
20 | |
21 from twisted.internet import reactor | |
22 from sat.core.log import getLogger | |
23 from sat.core import exceptions | |
24 from sat.tools import config as tools_config | |
25 | |
26 | |
27 try: | |
28 import OpenSSL | |
29 from twisted.internet import ssl | |
30 except ImportError: | |
31 ssl = None | |
32 | |
33 | |
34 log = getLogger(__name__) | |
35 | |
36 | |
37 def getOptionsFromConfig(config, section=""): | |
38 options = {} | |
39 for option in ('tls_certificate', 'tls_private_key', 'tls_chain'): | |
40 options[option] = tools_config.getConfig(config, section, option) | |
41 return options | |
42 | |
43 | |
44 def TLSOptionsCheck(options): | |
45 """Check options coherence if TLS is activated, and update missing values | |
46 | |
47 Must be called only if TLS is activated | |
48 """ | |
49 if not options["tls_certificate"]: | |
50 raise exceptions.ConfigError( | |
51 "a TLS certificate is needed to activate HTTPS connection") | |
52 if not options["tls_private_key"]: | |
53 options["tls_private_key"] = options["tls_certificate"] | |
54 | |
55 | |
56 def loadCertificates(f): | |
57 """Read a .pem file with a list of certificates | |
58 | |
59 @param f (file): file obj (opened .pem file) | |
60 @return (list[OpenSSL.crypto.X509]): list of certificates | |
61 @raise OpenSSL.crypto.Error: error while parsing the file | |
62 """ | |
63 # XXX: didn't found any method to load a .pem file with several certificates | |
64 # so the certificates split is done here | |
65 certificates = [] | |
66 buf = [] | |
67 while True: | |
68 line = f.readline() | |
69 buf.append(line) | |
70 if "-----END CERTIFICATE-----" in line: | |
71 certificates.append( | |
72 OpenSSL.crypto.load_certificate( | |
73 OpenSSL.crypto.FILETYPE_PEM, "".join(buf) | |
74 ) | |
75 ) | |
76 buf = [] | |
77 elif not line: | |
78 log.debug(f"{len(certificates)} certificate(s) found") | |
79 return certificates | |
80 | |
81 | |
82 def loadPKey(f): | |
83 """Read a private key from a .pem file | |
84 | |
85 @param f (file): file obj (opened .pem file) | |
86 @return (list[OpenSSL.crypto.PKey]): private key object | |
87 @raise OpenSSL.crypto.Error: error while parsing the file | |
88 """ | |
89 return OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, f.read()) | |
90 | |
91 | |
92 def loadCertificate(f): | |
93 """Read a public certificate from a .pem file | |
94 | |
95 @param f (file): file obj (opened .pem file) | |
96 @return (list[OpenSSL.crypto.X509]): public certificate | |
97 @raise OpenSSL.crypto.Error: error while parsing the file | |
98 """ | |
99 return OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, f.read()) | |
100 | |
101 | |
102 def getTLSContextFactory(options): | |
103 """Load TLS certificate and build the context factory needed for listenSSL""" | |
104 if ssl is None: | |
105 raise ImportError("Python module pyOpenSSL is not installed!") | |
106 | |
107 cert_options = {} | |
108 | |
109 for name, option, method in [ | |
110 ("privateKey", "tls_private_key", loadPKey), | |
111 ("certificate", "tls_certificate", loadCertificate), | |
112 ("extraCertChain", "tls_chain", loadCertificates), | |
113 ]: | |
114 path = options[option] | |
115 if not path: | |
116 assert option == "tls_chain" | |
117 continue | |
118 log.debug(f"loading {option} from {path}") | |
119 try: | |
120 with open(path) as f: | |
121 cert_options[name] = method(f) | |
122 except IOError as e: | |
123 raise exceptions.DataError( | |
124 f"Error while reading file {path} for option {option}: {e}" | |
125 ) | |
126 except OpenSSL.crypto.Error: | |
127 raise exceptions.DataError( | |
128 f"Error while parsing file {path} for option {option}, are you sure " | |
129 f"it is a valid .pem file?" | |
130 ) | |
131 if ( | |
132 option == "tls_private_key" | |
133 and options["tls_certificate"] == path | |
134 ): | |
135 raise exceptions.ConfigError( | |
136 f"You are using the same file for private key and public " | |
137 f"certificate, make sure that both a in {path} or use " | |
138 f"--tls_private_key option" | |
139 ) | |
140 | |
141 return ssl.CertificateOptions(**cert_options) |