Mercurial > libervia-backend
comparison libervia/backend/tools/common/tls.py @ 4071:4b842c1fb686
refactoring: renamed `sat` package to `libervia.backend`
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 02 Jun 2023 11:49:51 +0200 |
parents | sat/tools/common/tls.py@524856bd7b19 |
children | 0d7bb4df2343 |
comparison
equal
deleted
inserted
replaced
4070:d10748475025 | 4071:4b842c1fb686 |
---|---|
1 #!/usr/bin/env python3 | |
2 | |
3 # Libervia: an XMPP client | |
4 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) | |
5 | |
6 # This program is free software: you can redistribute it and/or modify | |
7 # it under the terms of the GNU Affero General Public License as published by | |
8 # the Free Software Foundation, either version 3 of the License, or | |
9 # (at your option) any later version. | |
10 | |
11 # This program is distributed in the hope that it will be useful, | |
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
14 # GNU Affero General Public License for more details. | |
15 | |
16 # You should have received a copy of the GNU Affero General Public License | |
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
18 | |
19 """TLS handling with twisted""" | |
20 | |
21 from libervia.backend.core.log import getLogger | |
22 from libervia.backend.core import exceptions | |
23 from libervia.backend.tools import config as tools_config | |
24 | |
25 | |
26 try: | |
27 import OpenSSL | |
28 from twisted.internet import ssl | |
29 except ImportError: | |
30 ssl = None | |
31 | |
32 | |
33 log = getLogger(__name__) | |
34 | |
35 | |
36 def get_options_from_config(config, section=""): | |
37 options = {} | |
38 for option in ('tls_certificate', 'tls_private_key', 'tls_chain'): | |
39 options[option] = tools_config.config_get(config, section, option) | |
40 return options | |
41 | |
42 | |
43 def tls_options_check(options): | |
44 """Check options coherence if TLS is activated, and update missing values | |
45 | |
46 Must be called only if TLS is activated | |
47 """ | |
48 if not options["tls_certificate"]: | |
49 raise exceptions.ConfigError( | |
50 "a TLS certificate is needed to activate HTTPS connection") | |
51 if not options["tls_private_key"]: | |
52 options["tls_private_key"] = options["tls_certificate"] | |
53 | |
54 | |
55 def load_certificates(f): | |
56 """Read a .pem file with a list of certificates | |
57 | |
58 @param f (file): file obj (opened .pem file) | |
59 @return (list[OpenSSL.crypto.X509]): list of certificates | |
60 @raise OpenSSL.crypto.Error: error while parsing the file | |
61 """ | |
62 # XXX: didn't found any method to load a .pem file with several certificates | |
63 # so the certificates split is done here | |
64 certificates = [] | |
65 buf = [] | |
66 while True: | |
67 line = f.readline() | |
68 buf.append(line) | |
69 if "-----END CERTIFICATE-----" in line: | |
70 certificates.append( | |
71 OpenSSL.crypto.load_certificate( | |
72 OpenSSL.crypto.FILETYPE_PEM, "".join(buf) | |
73 ) | |
74 ) | |
75 buf = [] | |
76 elif not line: | |
77 log.debug(f"{len(certificates)} certificate(s) found") | |
78 return certificates | |
79 | |
80 | |
81 def load_p_key(f): | |
82 """Read a private key from a .pem file | |
83 | |
84 @param f (file): file obj (opened .pem file) | |
85 @return (list[OpenSSL.crypto.PKey]): private key object | |
86 @raise OpenSSL.crypto.Error: error while parsing the file | |
87 """ | |
88 return OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, f.read()) | |
89 | |
90 | |
91 def load_certificate(f): | |
92 """Read a public certificate from a .pem file | |
93 | |
94 @param f (file): file obj (opened .pem file) | |
95 @return (list[OpenSSL.crypto.X509]): public certificate | |
96 @raise OpenSSL.crypto.Error: error while parsing the file | |
97 """ | |
98 return OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, f.read()) | |
99 | |
100 | |
101 def get_tls_context_factory(options): | |
102 """Load TLS certificate and build the context factory needed for listenSSL""" | |
103 if ssl is None: | |
104 raise ImportError("Python module pyOpenSSL is not installed!") | |
105 | |
106 cert_options = {} | |
107 | |
108 for name, option, method in [ | |
109 ("privateKey", "tls_private_key", load_p_key), | |
110 ("certificate", "tls_certificate", load_certificate), | |
111 ("extraCertChain", "tls_chain", load_certificates), | |
112 ]: | |
113 path = options[option] | |
114 if not path: | |
115 assert option == "tls_chain" | |
116 continue | |
117 log.debug(f"loading {option} from {path}") | |
118 try: | |
119 with open(path) as f: | |
120 cert_options[name] = method(f) | |
121 except IOError as e: | |
122 raise exceptions.DataError( | |
123 f"Error while reading file {path} for option {option}: {e}" | |
124 ) | |
125 except OpenSSL.crypto.Error: | |
126 raise exceptions.DataError( | |
127 f"Error while parsing file {path} for option {option}, are you sure " | |
128 f"it is a valid .pem file?" | |
129 ) | |
130 if ( | |
131 option == "tls_private_key" | |
132 and options["tls_certificate"] == path | |
133 ): | |
134 raise exceptions.ConfigError( | |
135 f"You are using the same file for private key and public " | |
136 f"certificate, make sure that both a in {path} or use " | |
137 f"--tls_private_key option" | |
138 ) | |
139 | |
140 return ssl.CertificateOptions(**cert_options) |