Mercurial > libervia-backend
comparison libervia/backend/plugins/plugin_misc_ip.py @ 4071:4b842c1fb686
refactoring: renamed `sat` package to `libervia.backend`
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 02 Jun 2023 11:49:51 +0200 |
parents | sat/plugins/plugin_misc_ip.py@524856bd7b19 |
children | b86912d3fd33 |
comparison
equal
deleted
inserted
replaced
4070:d10748475025 | 4071:4b842c1fb686 |
---|---|
1 #!/usr/bin/env python3 | |
2 | |
3 | |
4 # SAT plugin for IP address discovery | |
5 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) | |
6 | |
7 # This program is free software: you can redistribute it and/or modify | |
8 # it under the terms of the GNU Affero General Public License as published by | |
9 # the Free Software Foundation, either version 3 of the License, or | |
10 # (at your option) any later version. | |
11 | |
12 # This program is distributed in the hope that it will be useful, | |
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
15 # GNU Affero General Public License for more details. | |
16 | |
17 # You should have received a copy of the GNU Affero General Public License | |
18 # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
19 | |
20 import urllib.parse | |
21 from libervia.backend.core.i18n import _, D_ | |
22 from libervia.backend.core.constants import Const as C | |
23 from libervia.backend.core.log import getLogger | |
24 from libervia.backend.tools import xml_tools | |
25 from wokkel import disco, iwokkel | |
26 from twisted.web import client as webclient | |
27 from twisted.web import error as web_error | |
28 from twisted.internet import defer | |
29 from twisted.internet import reactor | |
30 from twisted.internet import protocol | |
31 from twisted.internet import endpoints | |
32 from twisted.internet import error as internet_error | |
33 from zope.interface import implementer | |
34 from twisted.words.protocols.jabber.xmlstream import XMPPHandler | |
35 from twisted.words.protocols.jabber.error import StanzaError | |
36 | |
37 log = getLogger(__name__) | |
38 | |
39 try: | |
40 import netifaces | |
41 except ImportError: | |
42 log.warning( | |
43 "netifaces is not available, it help discovering IPs, you can install it on https://pypi.python.org/pypi/netifaces" | |
44 ) | |
45 netifaces = None | |
46 | |
47 | |
48 PLUGIN_INFO = { | |
49 C.PI_NAME: "IP discovery", | |
50 C.PI_IMPORT_NAME: "IP", | |
51 C.PI_TYPE: C.PLUG_TYPE_MISC, | |
52 C.PI_MODES: C.PLUG_MODE_BOTH, | |
53 C.PI_PROTOCOLS: ["XEP-0279"], | |
54 C.PI_RECOMMENDATIONS: ["NAT-PORT"], | |
55 C.PI_MAIN: "IPPlugin", | |
56 C.PI_HANDLER: "yes", | |
57 C.PI_DESCRIPTION: _("""This plugin help to discover our external IP address."""), | |
58 } | |
59 | |
60 # TODO: GET_IP_PAGE should be configurable in sat.conf | |
61 GET_IP_PAGE = ( | |
62 "http://salut-a-toi.org/whereami/" | |
63 ) # This page must only return external IP of the requester | |
64 GET_IP_LABEL = D_("Allow external get IP") | |
65 GET_IP_CATEGORY = "General" | |
66 GET_IP_NAME = "allow_get_ip" | |
67 GET_IP_CONFIRM_TITLE = D_("Confirm external site request") | |
68 GET_IP_CONFIRM = D_( | |
69 """To facilitate data transfer, we need to contact a website. | |
70 A request will be done on {page} | |
71 That means that administrators of {domain} can know that you use "{app_name}" and your IP Address. | |
72 | |
73 IP address is an identifier to locate you on Internet (similar to a phone number). | |
74 | |
75 Do you agree to do this request ? | |
76 """ | |
77 ).format( | |
78 page=GET_IP_PAGE, domain=urllib.parse.urlparse(GET_IP_PAGE).netloc, app_name=C.APP_NAME | |
79 ) | |
80 NS_IP_CHECK = "urn:xmpp:sic:1" | |
81 | |
82 PARAMS = """ | |
83 <params> | |
84 <general> | |
85 <category name="{category}"> | |
86 <param name="{name}" label="{label}" type="bool" /> | |
87 </category> | |
88 </general> | |
89 </params> | |
90 """.format( | |
91 category=GET_IP_CATEGORY, name=GET_IP_NAME, label=GET_IP_LABEL | |
92 ) | |
93 | |
94 | |
95 class IPPlugin(object): | |
96 # TODO: refresh IP if a new connection is detected | |
97 # TODO: manage IPv6 when implemented in SàT | |
98 | |
99 def __init__(self, host): | |
100 log.info(_("plugin IP discovery initialization")) | |
101 self.host = host | |
102 host.memory.update_params(PARAMS) | |
103 | |
104 # NAT-Port | |
105 try: | |
106 self._nat = host.plugins["NAT-PORT"] | |
107 except KeyError: | |
108 log.debug("NAT port plugin not available") | |
109 self._nat = None | |
110 | |
111 # XXX: cache is kept until SàT is restarted | |
112 # if IP may have changed, use self.refresh_ip | |
113 self._external_ip_cache = None | |
114 self._local_ip_cache = None | |
115 | |
116 def get_handler(self, client): | |
117 return IPPlugin_handler() | |
118 | |
119 def refresh_ip(self): | |
120 # FIXME: use a trigger instead ? | |
121 self._external_ip_cache = None | |
122 self._local_ip_cache = None | |
123 | |
124 def _external_allowed(self, client): | |
125 """Return value of parameter with autorisation of user to do external requests | |
126 | |
127 if parameter is not set, a dialog is shown to use to get its confirmation, and parameted is set according to answer | |
128 @return (defer.Deferred[bool]): True if external request is autorised | |
129 """ | |
130 allow_get_ip = self.host.memory.params.param_get_a( | |
131 GET_IP_NAME, GET_IP_CATEGORY, use_default=False | |
132 ) | |
133 | |
134 if allow_get_ip is None: | |
135 # we don't have autorisation from user yet to use get_ip, we ask him | |
136 def param_set(allowed): | |
137 # FIXME: we need to use bool_const as param_set only manage str/unicode | |
138 # need to be fixed when params will be refactored | |
139 self.host.memory.param_set( | |
140 GET_IP_NAME, C.bool_const(allowed), GET_IP_CATEGORY | |
141 ) | |
142 return allowed | |
143 | |
144 d = xml_tools.defer_confirm( | |
145 self.host, | |
146 _(GET_IP_CONFIRM), | |
147 _(GET_IP_CONFIRM_TITLE), | |
148 profile=client.profile, | |
149 ) | |
150 d.addCallback(param_set) | |
151 return d | |
152 | |
153 return defer.succeed(allow_get_ip) | |
154 | |
155 def _filter_addresse(self, ip_addr): | |
156 """Filter acceptable addresses | |
157 | |
158 For now, just remove IPv4 local addresses | |
159 @param ip_addr(str): IP addresse | |
160 @return (bool): True if addresse is acceptable | |
161 """ | |
162 return not ip_addr.startswith("127.") | |
163 | |
164 def _insert_first(self, addresses, ip_addr): | |
165 """Insert ip_addr as first item in addresses | |
166 | |
167 @param addresses(list): list of IP addresses | |
168 @param ip_addr(str): IP addresse | |
169 """ | |
170 if ip_addr in addresses: | |
171 if addresses[0] != ip_addr: | |
172 addresses.remove(ip_addr) | |
173 addresses.insert(0, ip_addr) | |
174 else: | |
175 addresses.insert(0, ip_addr) | |
176 | |
177 async def _get_ip_from_external(self, ext_url): | |
178 """Get local IP by doing a connection on an external url | |
179 | |
180 @param ext_utl(str): url to connect to | |
181 @return (str, None): return local IP, or None if it's not possible | |
182 """ | |
183 url = urllib.parse.urlparse(ext_url) | |
184 port = url.port | |
185 if port is None: | |
186 if url.scheme == "http": | |
187 port = 80 | |
188 elif url.scheme == "https": | |
189 port = 443 | |
190 else: | |
191 log.error("Unknown url scheme: {}".format(url.scheme)) | |
192 return None | |
193 if url.hostname is None: | |
194 log.error("Can't find url hostname for {}".format(GET_IP_PAGE)) | |
195 | |
196 point = endpoints.TCP4ClientEndpoint(reactor, url.hostname, port) | |
197 | |
198 p = await endpoints.connectProtocol(point, protocol.Protocol()) | |
199 local_ip = p.transport.getHost().host | |
200 p.transport.loseConnection() | |
201 return local_ip | |
202 | |
203 @defer.inlineCallbacks | |
204 def get_local_i_ps(self, client): | |
205 """Try do discover local area network IPs | |
206 | |
207 @return (deferred): list of lan IP addresses | |
208 if there are several addresses, the one used with the server is put first | |
209 if no address is found, localhost IP will be in the list | |
210 """ | |
211 # TODO: manage permission requesting (e.g. for UMTS link) | |
212 if self._local_ip_cache is not None: | |
213 defer.returnValue(self._local_ip_cache) | |
214 addresses = [] | |
215 localhost = ["127.0.0.1"] | |
216 | |
217 # we first try our luck with netifaces | |
218 if netifaces is not None: | |
219 addresses = [] | |
220 for interface in netifaces.interfaces(): | |
221 if_addresses = netifaces.ifaddresses(interface) | |
222 try: | |
223 inet_list = if_addresses[netifaces.AF_INET] | |
224 except KeyError: | |
225 continue | |
226 for data in inet_list: | |
227 addresse = data["addr"] | |
228 if self._filter_addresse(addresse): | |
229 addresses.append(addresse) | |
230 | |
231 # then we use our connection to server | |
232 ip = client.xmlstream.transport.getHost().host | |
233 if self._filter_addresse(ip): | |
234 self._insert_first(addresses, ip) | |
235 defer.returnValue(addresses) | |
236 | |
237 # if server is local, we try with NAT-Port | |
238 if self._nat is not None: | |
239 nat_ip = yield self._nat.get_ip(local=True) | |
240 if nat_ip is not None: | |
241 self._insert_first(addresses, nat_ip) | |
242 defer.returnValue(addresses) | |
243 | |
244 if addresses: | |
245 defer.returnValue(addresses) | |
246 | |
247 # still not luck, we need to contact external website | |
248 allow_get_ip = yield self._external_allowed(client) | |
249 | |
250 if not allow_get_ip: | |
251 defer.returnValue(addresses or localhost) | |
252 | |
253 try: | |
254 local_ip = yield defer.ensureDeferred(self._get_ip_from_external(GET_IP_PAGE)) | |
255 except (internet_error.DNSLookupError, internet_error.TimeoutError): | |
256 log.warning("Can't access Domain Name System") | |
257 else: | |
258 if local_ip is not None: | |
259 self._insert_first(addresses, local_ip) | |
260 | |
261 defer.returnValue(addresses or localhost) | |
262 | |
263 @defer.inlineCallbacks | |
264 def get_external_ip(self, client): | |
265 """Try to discover external IP | |
266 | |
267 @return (deferred): external IP address or None if it can't be discovered | |
268 """ | |
269 if self._external_ip_cache is not None: | |
270 defer.returnValue(self._external_ip_cache) | |
271 | |
272 # we first try with XEP-0279 | |
273 ip_check = yield self.host.hasFeature(client, NS_IP_CHECK) | |
274 if ip_check: | |
275 log.debug("Server IP Check available, we use it to retrieve our IP") | |
276 iq_elt = client.IQ("get") | |
277 iq_elt.addElement((NS_IP_CHECK, "address")) | |
278 try: | |
279 result_elt = yield iq_elt.send() | |
280 address_elt = next(result_elt.elements(NS_IP_CHECK, "address")) | |
281 ip_elt = next(address_elt.elements(NS_IP_CHECK, "ip")) | |
282 except StopIteration: | |
283 log.warning( | |
284 "Server returned invalid result on XEP-0279 request, we ignore it" | |
285 ) | |
286 except StanzaError as e: | |
287 log.warning("error while requesting ip to server: {}".format(e)) | |
288 else: | |
289 # FIXME: server IP may not be the same as external IP (server can be on local machine or network) | |
290 # IP should be checked to see if we have a local one, and rejected in this case | |
291 external_ip = str(ip_elt) | |
292 log.debug("External IP found: {}".format(external_ip)) | |
293 self._external_ip_cache = external_ip | |
294 defer.returnValue(self._external_ip_cache) | |
295 | |
296 # then with NAT-Port | |
297 if self._nat is not None: | |
298 nat_ip = yield self._nat.get_ip() | |
299 if nat_ip is not None: | |
300 self._external_ip_cache = nat_ip | |
301 defer.returnValue(nat_ip) | |
302 | |
303 # and finally by requesting external website | |
304 allow_get_ip = yield self._external_allowed(client) | |
305 try: | |
306 ip = ((yield webclient.getPage(GET_IP_PAGE.encode('utf-8'))) | |
307 if allow_get_ip else None) | |
308 except (internet_error.DNSLookupError, internet_error.TimeoutError): | |
309 log.warning("Can't access Domain Name System") | |
310 ip = None | |
311 except web_error.Error as e: | |
312 log.warning( | |
313 "Error while retrieving IP on {url}: {message}".format( | |
314 url=GET_IP_PAGE, message=e | |
315 ) | |
316 ) | |
317 ip = None | |
318 else: | |
319 self._external_ip_cache = ip | |
320 defer.returnValue(ip) | |
321 | |
322 | |
323 @implementer(iwokkel.IDisco) | |
324 class IPPlugin_handler(XMPPHandler): | |
325 | |
326 def getDiscoInfo(self, requestor, target, nodeIdentifier=""): | |
327 return [disco.DiscoFeature(NS_IP_CHECK)] | |
328 | |
329 def getDiscoItems(self, requestor, target, nodeIdentifier=""): | |
330 return [] |