comparison libervia/backend/plugins/plugin_misc_ip.py @ 4071:4b842c1fb686

refactoring: renamed `sat` package to `libervia.backend`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 11:49:51 +0200
parents sat/plugins/plugin_misc_ip.py@524856bd7b19
children b86912d3fd33
comparison
equal deleted inserted replaced
4070:d10748475025 4071:4b842c1fb686
1 #!/usr/bin/env python3
2
3
4 # SAT plugin for IP address discovery
5 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
6
7 # This program is free software: you can redistribute it and/or modify
8 # it under the terms of the GNU Affero General Public License as published by
9 # the Free Software Foundation, either version 3 of the License, or
10 # (at your option) any later version.
11
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU Affero General Public License for more details.
16
17 # You should have received a copy of the GNU Affero General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19
20 import urllib.parse
21 from libervia.backend.core.i18n import _, D_
22 from libervia.backend.core.constants import Const as C
23 from libervia.backend.core.log import getLogger
24 from libervia.backend.tools import xml_tools
25 from wokkel import disco, iwokkel
26 from twisted.web import client as webclient
27 from twisted.web import error as web_error
28 from twisted.internet import defer
29 from twisted.internet import reactor
30 from twisted.internet import protocol
31 from twisted.internet import endpoints
32 from twisted.internet import error as internet_error
33 from zope.interface import implementer
34 from twisted.words.protocols.jabber.xmlstream import XMPPHandler
35 from twisted.words.protocols.jabber.error import StanzaError
36
37 log = getLogger(__name__)
38
39 try:
40 import netifaces
41 except ImportError:
42 log.warning(
43 "netifaces is not available, it help discovering IPs, you can install it on https://pypi.python.org/pypi/netifaces"
44 )
45 netifaces = None
46
47
48 PLUGIN_INFO = {
49 C.PI_NAME: "IP discovery",
50 C.PI_IMPORT_NAME: "IP",
51 C.PI_TYPE: C.PLUG_TYPE_MISC,
52 C.PI_MODES: C.PLUG_MODE_BOTH,
53 C.PI_PROTOCOLS: ["XEP-0279"],
54 C.PI_RECOMMENDATIONS: ["NAT-PORT"],
55 C.PI_MAIN: "IPPlugin",
56 C.PI_HANDLER: "yes",
57 C.PI_DESCRIPTION: _("""This plugin help to discover our external IP address."""),
58 }
59
60 # TODO: GET_IP_PAGE should be configurable in sat.conf
61 GET_IP_PAGE = (
62 "http://salut-a-toi.org/whereami/"
63 ) # This page must only return external IP of the requester
64 GET_IP_LABEL = D_("Allow external get IP")
65 GET_IP_CATEGORY = "General"
66 GET_IP_NAME = "allow_get_ip"
67 GET_IP_CONFIRM_TITLE = D_("Confirm external site request")
68 GET_IP_CONFIRM = D_(
69 """To facilitate data transfer, we need to contact a website.
70 A request will be done on {page}
71 That means that administrators of {domain} can know that you use "{app_name}" and your IP Address.
72
73 IP address is an identifier to locate you on Internet (similar to a phone number).
74
75 Do you agree to do this request ?
76 """
77 ).format(
78 page=GET_IP_PAGE, domain=urllib.parse.urlparse(GET_IP_PAGE).netloc, app_name=C.APP_NAME
79 )
80 NS_IP_CHECK = "urn:xmpp:sic:1"
81
82 PARAMS = """
83 <params>
84 <general>
85 <category name="{category}">
86 <param name="{name}" label="{label}" type="bool" />
87 </category>
88 </general>
89 </params>
90 """.format(
91 category=GET_IP_CATEGORY, name=GET_IP_NAME, label=GET_IP_LABEL
92 )
93
94
95 class IPPlugin(object):
96 # TODO: refresh IP if a new connection is detected
97 # TODO: manage IPv6 when implemented in SàT
98
99 def __init__(self, host):
100 log.info(_("plugin IP discovery initialization"))
101 self.host = host
102 host.memory.update_params(PARAMS)
103
104 # NAT-Port
105 try:
106 self._nat = host.plugins["NAT-PORT"]
107 except KeyError:
108 log.debug("NAT port plugin not available")
109 self._nat = None
110
111 # XXX: cache is kept until SàT is restarted
112 # if IP may have changed, use self.refresh_ip
113 self._external_ip_cache = None
114 self._local_ip_cache = None
115
116 def get_handler(self, client):
117 return IPPlugin_handler()
118
119 def refresh_ip(self):
120 # FIXME: use a trigger instead ?
121 self._external_ip_cache = None
122 self._local_ip_cache = None
123
124 def _external_allowed(self, client):
125 """Return value of parameter with autorisation of user to do external requests
126
127 if parameter is not set, a dialog is shown to use to get its confirmation, and parameted is set according to answer
128 @return (defer.Deferred[bool]): True if external request is autorised
129 """
130 allow_get_ip = self.host.memory.params.param_get_a(
131 GET_IP_NAME, GET_IP_CATEGORY, use_default=False
132 )
133
134 if allow_get_ip is None:
135 # we don't have autorisation from user yet to use get_ip, we ask him
136 def param_set(allowed):
137 # FIXME: we need to use bool_const as param_set only manage str/unicode
138 # need to be fixed when params will be refactored
139 self.host.memory.param_set(
140 GET_IP_NAME, C.bool_const(allowed), GET_IP_CATEGORY
141 )
142 return allowed
143
144 d = xml_tools.defer_confirm(
145 self.host,
146 _(GET_IP_CONFIRM),
147 _(GET_IP_CONFIRM_TITLE),
148 profile=client.profile,
149 )
150 d.addCallback(param_set)
151 return d
152
153 return defer.succeed(allow_get_ip)
154
155 def _filter_addresse(self, ip_addr):
156 """Filter acceptable addresses
157
158 For now, just remove IPv4 local addresses
159 @param ip_addr(str): IP addresse
160 @return (bool): True if addresse is acceptable
161 """
162 return not ip_addr.startswith("127.")
163
164 def _insert_first(self, addresses, ip_addr):
165 """Insert ip_addr as first item in addresses
166
167 @param addresses(list): list of IP addresses
168 @param ip_addr(str): IP addresse
169 """
170 if ip_addr in addresses:
171 if addresses[0] != ip_addr:
172 addresses.remove(ip_addr)
173 addresses.insert(0, ip_addr)
174 else:
175 addresses.insert(0, ip_addr)
176
177 async def _get_ip_from_external(self, ext_url):
178 """Get local IP by doing a connection on an external url
179
180 @param ext_utl(str): url to connect to
181 @return (str, None): return local IP, or None if it's not possible
182 """
183 url = urllib.parse.urlparse(ext_url)
184 port = url.port
185 if port is None:
186 if url.scheme == "http":
187 port = 80
188 elif url.scheme == "https":
189 port = 443
190 else:
191 log.error("Unknown url scheme: {}".format(url.scheme))
192 return None
193 if url.hostname is None:
194 log.error("Can't find url hostname for {}".format(GET_IP_PAGE))
195
196 point = endpoints.TCP4ClientEndpoint(reactor, url.hostname, port)
197
198 p = await endpoints.connectProtocol(point, protocol.Protocol())
199 local_ip = p.transport.getHost().host
200 p.transport.loseConnection()
201 return local_ip
202
203 @defer.inlineCallbacks
204 def get_local_i_ps(self, client):
205 """Try do discover local area network IPs
206
207 @return (deferred): list of lan IP addresses
208 if there are several addresses, the one used with the server is put first
209 if no address is found, localhost IP will be in the list
210 """
211 # TODO: manage permission requesting (e.g. for UMTS link)
212 if self._local_ip_cache is not None:
213 defer.returnValue(self._local_ip_cache)
214 addresses = []
215 localhost = ["127.0.0.1"]
216
217 # we first try our luck with netifaces
218 if netifaces is not None:
219 addresses = []
220 for interface in netifaces.interfaces():
221 if_addresses = netifaces.ifaddresses(interface)
222 try:
223 inet_list = if_addresses[netifaces.AF_INET]
224 except KeyError:
225 continue
226 for data in inet_list:
227 addresse = data["addr"]
228 if self._filter_addresse(addresse):
229 addresses.append(addresse)
230
231 # then we use our connection to server
232 ip = client.xmlstream.transport.getHost().host
233 if self._filter_addresse(ip):
234 self._insert_first(addresses, ip)
235 defer.returnValue(addresses)
236
237 # if server is local, we try with NAT-Port
238 if self._nat is not None:
239 nat_ip = yield self._nat.get_ip(local=True)
240 if nat_ip is not None:
241 self._insert_first(addresses, nat_ip)
242 defer.returnValue(addresses)
243
244 if addresses:
245 defer.returnValue(addresses)
246
247 # still not luck, we need to contact external website
248 allow_get_ip = yield self._external_allowed(client)
249
250 if not allow_get_ip:
251 defer.returnValue(addresses or localhost)
252
253 try:
254 local_ip = yield defer.ensureDeferred(self._get_ip_from_external(GET_IP_PAGE))
255 except (internet_error.DNSLookupError, internet_error.TimeoutError):
256 log.warning("Can't access Domain Name System")
257 else:
258 if local_ip is not None:
259 self._insert_first(addresses, local_ip)
260
261 defer.returnValue(addresses or localhost)
262
263 @defer.inlineCallbacks
264 def get_external_ip(self, client):
265 """Try to discover external IP
266
267 @return (deferred): external IP address or None if it can't be discovered
268 """
269 if self._external_ip_cache is not None:
270 defer.returnValue(self._external_ip_cache)
271
272 # we first try with XEP-0279
273 ip_check = yield self.host.hasFeature(client, NS_IP_CHECK)
274 if ip_check:
275 log.debug("Server IP Check available, we use it to retrieve our IP")
276 iq_elt = client.IQ("get")
277 iq_elt.addElement((NS_IP_CHECK, "address"))
278 try:
279 result_elt = yield iq_elt.send()
280 address_elt = next(result_elt.elements(NS_IP_CHECK, "address"))
281 ip_elt = next(address_elt.elements(NS_IP_CHECK, "ip"))
282 except StopIteration:
283 log.warning(
284 "Server returned invalid result on XEP-0279 request, we ignore it"
285 )
286 except StanzaError as e:
287 log.warning("error while requesting ip to server: {}".format(e))
288 else:
289 # FIXME: server IP may not be the same as external IP (server can be on local machine or network)
290 # IP should be checked to see if we have a local one, and rejected in this case
291 external_ip = str(ip_elt)
292 log.debug("External IP found: {}".format(external_ip))
293 self._external_ip_cache = external_ip
294 defer.returnValue(self._external_ip_cache)
295
296 # then with NAT-Port
297 if self._nat is not None:
298 nat_ip = yield self._nat.get_ip()
299 if nat_ip is not None:
300 self._external_ip_cache = nat_ip
301 defer.returnValue(nat_ip)
302
303 # and finally by requesting external website
304 allow_get_ip = yield self._external_allowed(client)
305 try:
306 ip = ((yield webclient.getPage(GET_IP_PAGE.encode('utf-8')))
307 if allow_get_ip else None)
308 except (internet_error.DNSLookupError, internet_error.TimeoutError):
309 log.warning("Can't access Domain Name System")
310 ip = None
311 except web_error.Error as e:
312 log.warning(
313 "Error while retrieving IP on {url}: {message}".format(
314 url=GET_IP_PAGE, message=e
315 )
316 )
317 ip = None
318 else:
319 self._external_ip_cache = ip
320 defer.returnValue(ip)
321
322
323 @implementer(iwokkel.IDisco)
324 class IPPlugin_handler(XMPPHandler):
325
326 def getDiscoInfo(self, requestor, target, nodeIdentifier=""):
327 return [disco.DiscoFeature(NS_IP_CHECK)]
328
329 def getDiscoItems(self, requestor, target, nodeIdentifier=""):
330 return []