Mercurial > libervia-backend
diff libervia/backend/plugins/plugin_misc_ip.py @ 4071:4b842c1fb686
refactoring: renamed `sat` package to `libervia.backend`
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 02 Jun 2023 11:49:51 +0200 |
parents | sat/plugins/plugin_misc_ip.py@524856bd7b19 |
children | b86912d3fd33 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/libervia/backend/plugins/plugin_misc_ip.py Fri Jun 02 11:49:51 2023 +0200 @@ -0,0 +1,330 @@ +#!/usr/bin/env python3 + + +# SAT plugin for IP address discovery +# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import urllib.parse +from libervia.backend.core.i18n import _, D_ +from libervia.backend.core.constants import Const as C +from libervia.backend.core.log import getLogger +from libervia.backend.tools import xml_tools +from wokkel import disco, iwokkel +from twisted.web import client as webclient +from twisted.web import error as web_error +from twisted.internet import defer +from twisted.internet import reactor +from twisted.internet import protocol +from twisted.internet import endpoints +from twisted.internet import error as internet_error +from zope.interface import implementer +from twisted.words.protocols.jabber.xmlstream import XMPPHandler +from twisted.words.protocols.jabber.error import StanzaError + +log = getLogger(__name__) + +try: + import netifaces +except ImportError: + log.warning( + "netifaces is not available, it help discovering IPs, you can install it on https://pypi.python.org/pypi/netifaces" + ) + netifaces = None + + +PLUGIN_INFO = { + C.PI_NAME: "IP discovery", + C.PI_IMPORT_NAME: "IP", + C.PI_TYPE: C.PLUG_TYPE_MISC, + C.PI_MODES: C.PLUG_MODE_BOTH, + C.PI_PROTOCOLS: ["XEP-0279"], + C.PI_RECOMMENDATIONS: ["NAT-PORT"], + C.PI_MAIN: "IPPlugin", + C.PI_HANDLER: "yes", + C.PI_DESCRIPTION: _("""This plugin help to discover our external IP address."""), +} + +# TODO: GET_IP_PAGE should be configurable in sat.conf +GET_IP_PAGE = ( + "http://salut-a-toi.org/whereami/" +) # This page must only return external IP of the requester +GET_IP_LABEL = D_("Allow external get IP") +GET_IP_CATEGORY = "General" +GET_IP_NAME = "allow_get_ip" +GET_IP_CONFIRM_TITLE = D_("Confirm external site request") +GET_IP_CONFIRM = D_( + """To facilitate data transfer, we need to contact a website. +A request will be done on {page} +That means that administrators of {domain} can know that you use "{app_name}" and your IP Address. + +IP address is an identifier to locate you on Internet (similar to a phone number). + +Do you agree to do this request ? +""" +).format( + page=GET_IP_PAGE, domain=urllib.parse.urlparse(GET_IP_PAGE).netloc, app_name=C.APP_NAME +) +NS_IP_CHECK = "urn:xmpp:sic:1" + +PARAMS = """ + <params> + <general> + <category name="{category}"> + <param name="{name}" label="{label}" type="bool" /> + </category> + </general> + </params> + """.format( + category=GET_IP_CATEGORY, name=GET_IP_NAME, label=GET_IP_LABEL +) + + +class IPPlugin(object): + # TODO: refresh IP if a new connection is detected + # TODO: manage IPv6 when implemented in SàT + + def __init__(self, host): + log.info(_("plugin IP discovery initialization")) + self.host = host + host.memory.update_params(PARAMS) + + # NAT-Port + try: + self._nat = host.plugins["NAT-PORT"] + except KeyError: + log.debug("NAT port plugin not available") + self._nat = None + + # XXX: cache is kept until SàT is restarted + # if IP may have changed, use self.refresh_ip + self._external_ip_cache = None + self._local_ip_cache = None + + def get_handler(self, client): + return IPPlugin_handler() + + def refresh_ip(self): + # FIXME: use a trigger instead ? + self._external_ip_cache = None + self._local_ip_cache = None + + def _external_allowed(self, client): + """Return value of parameter with autorisation of user to do external requests + + if parameter is not set, a dialog is shown to use to get its confirmation, and parameted is set according to answer + @return (defer.Deferred[bool]): True if external request is autorised + """ + allow_get_ip = self.host.memory.params.param_get_a( + GET_IP_NAME, GET_IP_CATEGORY, use_default=False + ) + + if allow_get_ip is None: + # we don't have autorisation from user yet to use get_ip, we ask him + def param_set(allowed): + # FIXME: we need to use bool_const as param_set only manage str/unicode + # need to be fixed when params will be refactored + self.host.memory.param_set( + GET_IP_NAME, C.bool_const(allowed), GET_IP_CATEGORY + ) + return allowed + + d = xml_tools.defer_confirm( + self.host, + _(GET_IP_CONFIRM), + _(GET_IP_CONFIRM_TITLE), + profile=client.profile, + ) + d.addCallback(param_set) + return d + + return defer.succeed(allow_get_ip) + + def _filter_addresse(self, ip_addr): + """Filter acceptable addresses + + For now, just remove IPv4 local addresses + @param ip_addr(str): IP addresse + @return (bool): True if addresse is acceptable + """ + return not ip_addr.startswith("127.") + + def _insert_first(self, addresses, ip_addr): + """Insert ip_addr as first item in addresses + + @param addresses(list): list of IP addresses + @param ip_addr(str): IP addresse + """ + if ip_addr in addresses: + if addresses[0] != ip_addr: + addresses.remove(ip_addr) + addresses.insert(0, ip_addr) + else: + addresses.insert(0, ip_addr) + + async def _get_ip_from_external(self, ext_url): + """Get local IP by doing a connection on an external url + + @param ext_utl(str): url to connect to + @return (str, None): return local IP, or None if it's not possible + """ + url = urllib.parse.urlparse(ext_url) + port = url.port + if port is None: + if url.scheme == "http": + port = 80 + elif url.scheme == "https": + port = 443 + else: + log.error("Unknown url scheme: {}".format(url.scheme)) + return None + if url.hostname is None: + log.error("Can't find url hostname for {}".format(GET_IP_PAGE)) + + point = endpoints.TCP4ClientEndpoint(reactor, url.hostname, port) + + p = await endpoints.connectProtocol(point, protocol.Protocol()) + local_ip = p.transport.getHost().host + p.transport.loseConnection() + return local_ip + + @defer.inlineCallbacks + def get_local_i_ps(self, client): + """Try do discover local area network IPs + + @return (deferred): list of lan IP addresses + if there are several addresses, the one used with the server is put first + if no address is found, localhost IP will be in the list + """ + # TODO: manage permission requesting (e.g. for UMTS link) + if self._local_ip_cache is not None: + defer.returnValue(self._local_ip_cache) + addresses = [] + localhost = ["127.0.0.1"] + + # we first try our luck with netifaces + if netifaces is not None: + addresses = [] + for interface in netifaces.interfaces(): + if_addresses = netifaces.ifaddresses(interface) + try: + inet_list = if_addresses[netifaces.AF_INET] + except KeyError: + continue + for data in inet_list: + addresse = data["addr"] + if self._filter_addresse(addresse): + addresses.append(addresse) + + # then we use our connection to server + ip = client.xmlstream.transport.getHost().host + if self._filter_addresse(ip): + self._insert_first(addresses, ip) + defer.returnValue(addresses) + + # if server is local, we try with NAT-Port + if self._nat is not None: + nat_ip = yield self._nat.get_ip(local=True) + if nat_ip is not None: + self._insert_first(addresses, nat_ip) + defer.returnValue(addresses) + + if addresses: + defer.returnValue(addresses) + + # still not luck, we need to contact external website + allow_get_ip = yield self._external_allowed(client) + + if not allow_get_ip: + defer.returnValue(addresses or localhost) + + try: + local_ip = yield defer.ensureDeferred(self._get_ip_from_external(GET_IP_PAGE)) + except (internet_error.DNSLookupError, internet_error.TimeoutError): + log.warning("Can't access Domain Name System") + else: + if local_ip is not None: + self._insert_first(addresses, local_ip) + + defer.returnValue(addresses or localhost) + + @defer.inlineCallbacks + def get_external_ip(self, client): + """Try to discover external IP + + @return (deferred): external IP address or None if it can't be discovered + """ + if self._external_ip_cache is not None: + defer.returnValue(self._external_ip_cache) + + # we first try with XEP-0279 + ip_check = yield self.host.hasFeature(client, NS_IP_CHECK) + if ip_check: + log.debug("Server IP Check available, we use it to retrieve our IP") + iq_elt = client.IQ("get") + iq_elt.addElement((NS_IP_CHECK, "address")) + try: + result_elt = yield iq_elt.send() + address_elt = next(result_elt.elements(NS_IP_CHECK, "address")) + ip_elt = next(address_elt.elements(NS_IP_CHECK, "ip")) + except StopIteration: + log.warning( + "Server returned invalid result on XEP-0279 request, we ignore it" + ) + except StanzaError as e: + log.warning("error while requesting ip to server: {}".format(e)) + else: + # FIXME: server IP may not be the same as external IP (server can be on local machine or network) + # IP should be checked to see if we have a local one, and rejected in this case + external_ip = str(ip_elt) + log.debug("External IP found: {}".format(external_ip)) + self._external_ip_cache = external_ip + defer.returnValue(self._external_ip_cache) + + # then with NAT-Port + if self._nat is not None: + nat_ip = yield self._nat.get_ip() + if nat_ip is not None: + self._external_ip_cache = nat_ip + defer.returnValue(nat_ip) + + # and finally by requesting external website + allow_get_ip = yield self._external_allowed(client) + try: + ip = ((yield webclient.getPage(GET_IP_PAGE.encode('utf-8'))) + if allow_get_ip else None) + except (internet_error.DNSLookupError, internet_error.TimeoutError): + log.warning("Can't access Domain Name System") + ip = None + except web_error.Error as e: + log.warning( + "Error while retrieving IP on {url}: {message}".format( + url=GET_IP_PAGE, message=e + ) + ) + ip = None + else: + self._external_ip_cache = ip + defer.returnValue(ip) + + +@implementer(iwokkel.IDisco) +class IPPlugin_handler(XMPPHandler): + + def getDiscoInfo(self, requestor, target, nodeIdentifier=""): + return [disco.DiscoFeature(NS_IP_CHECK)] + + def getDiscoItems(self, requestor, target, nodeIdentifier=""): + return []