Mercurial > libervia-backend
view libervia/backend/plugins/plugin_misc_ip.py @ 4232:0fbe5c605eb6
tests (unit/webrtc,XEP-0176, XEP-0234): Fix tests and add webrtc file transfer tests:
fix 441
author | Goffi <goffi@goffi.org> |
---|---|
date | Sat, 06 Apr 2024 12:59:50 +0200 |
parents | b86912d3fd33 |
children | 0d7bb4df2343 |
line wrap: on
line source
#!/usr/bin/env python3 # SAT plugin for IP address discovery # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import urllib.parse from libervia.backend.core.i18n import _, D_ from libervia.backend.core.constants import Const as C from libervia.backend.core.log import getLogger from libervia.backend.tools import xml_tools from wokkel import disco, iwokkel from twisted.web import client as webclient from twisted.web import error as web_error from twisted.internet import defer from twisted.internet import reactor from twisted.internet import protocol from twisted.internet import endpoints from twisted.internet import error as internet_error from zope.interface import implementer from twisted.words.protocols.jabber.xmlstream import XMPPHandler from twisted.words.protocols.jabber.error import StanzaError log = getLogger(__name__) try: import netifaces except ImportError: log.warning( "netifaces is not available, it help discovering IPs, you can install it on https://pypi.python.org/pypi/netifaces" ) netifaces = None PLUGIN_INFO = { C.PI_NAME: "IP discovery", C.PI_IMPORT_NAME: "IP", C.PI_TYPE: C.PLUG_TYPE_MISC, C.PI_MODES: C.PLUG_MODE_BOTH, C.PI_PROTOCOLS: ["XEP-0279"], C.PI_RECOMMENDATIONS: ["NAT-PORT"], C.PI_MAIN: "IPPlugin", C.PI_HANDLER: "yes", C.PI_DESCRIPTION: _("""This plugin help to discover our external IP address."""), } # TODO: GET_IP_PAGE should be configurable in sat.conf GET_IP_PAGE = ( "http://salut-a-toi.org/whereami/" ) # This page must only return external IP of the requester GET_IP_LABEL = D_("Allow external get IP") GET_IP_CATEGORY = "General" GET_IP_NAME = "allow_get_ip" GET_IP_CONFIRM_TITLE = D_("Confirm external site request") GET_IP_CONFIRM = D_( """To facilitate data transfer, we need to contact a website. A request will be done on {page} That means that administrators of {domain} can know that you use "{app_name}" and your IP Address. IP address is an identifier to locate you on Internet (similar to a phone number). Do you agree to do this request ? """ ).format( page=GET_IP_PAGE, domain=urllib.parse.urlparse(GET_IP_PAGE).netloc, app_name=C.APP_NAME ) NS_IP_CHECK = "urn:xmpp:sic:1" PARAMS = """ <params> <general> <category name="{category}"> <param name="{name}" label="{label}" type="bool" /> </category> </general> </params> """.format( category=GET_IP_CATEGORY, name=GET_IP_NAME, label=GET_IP_LABEL ) class IPPlugin(object): # TODO: refresh IP if a new connection is detected # TODO: manage IPv6 when implemented in SàT def __init__(self, host): log.info(_("plugin IP discovery initialization")) self.host = host host.memory.update_params(PARAMS) # NAT-Port try: self._nat = host.plugins["NAT-PORT"] except KeyError: log.debug("NAT port plugin not available") self._nat = None # XXX: cache is kept until SàT is restarted # if IP may have changed, use self.refresh_ip self._external_ip_cache = None self._local_ip_cache = None def get_handler(self, client): return IPPlugin_handler() def refresh_ip(self): # FIXME: use a trigger instead ? self._external_ip_cache = None self._local_ip_cache = None def _external_allowed(self, client): """Return value of parameter with autorisation of user to do external requests if parameter is not set, a dialog is shown to use to get its confirmation, and parameted is set according to answer @return (defer.Deferred[bool]): True if external request is autorised """ allow_get_ip = self.host.memory.params.param_get_a( GET_IP_NAME, GET_IP_CATEGORY, use_default=False ) if allow_get_ip is None: # we don't have autorisation from user yet to use get_ip, we ask him def param_set(allowed): # FIXME: we need to use bool_const as param_set only manage str/unicode # need to be fixed when params will be refactored self.host.memory.param_set( GET_IP_NAME, C.bool_const(allowed), GET_IP_CATEGORY ) return allowed d = xml_tools.defer_confirm( self.host, _(GET_IP_CONFIRM), _(GET_IP_CONFIRM_TITLE), profile=client.profile, ) d.addCallback(param_set) return d return defer.succeed(allow_get_ip) def _filter_addresse(self, ip_addr): """Filter acceptable addresses For now, just remove IPv4 local addresses @param ip_addr(str): IP addresse @return (bool): True if addresse is acceptable """ return not ip_addr.startswith("127.") def _insert_first(self, addresses, ip_addr): """Insert ip_addr as first item in addresses @param addresses(list): list of IP addresses @param ip_addr(str): IP addresse """ if ip_addr in addresses: if addresses[0] != ip_addr: addresses.remove(ip_addr) addresses.insert(0, ip_addr) else: addresses.insert(0, ip_addr) async def _get_ip_from_external(self, ext_url): """Get local IP by doing a connection on an external url @param ext_utl(str): url to connect to @return (str, None): return local IP, or None if it's not possible """ url = urllib.parse.urlparse(ext_url) port = url.port if port is None: if url.scheme == "http": port = 80 elif url.scheme == "https": port = 443 else: log.error("Unknown url scheme: {}".format(url.scheme)) return None if url.hostname is None: log.error("Can't find url hostname for {}".format(GET_IP_PAGE)) point = endpoints.TCP4ClientEndpoint(reactor, url.hostname, port) p = await endpoints.connectProtocol(point, protocol.Protocol()) local_ip = p.transport.getHost().host p.transport.loseConnection() return local_ip async def get_local_ips(self, client): """Try do discover local area network IPs @return (deferred): list of lan IP addresses if there are several addresses, the one used with the server is put first if no address is found, localhost IP will be in the list """ # TODO: manage permission requesting (e.g. for UMTS link) if self._local_ip_cache is not None: return self._local_ip_cache addresses = [] localhost = ["127.0.0.1"] # we first try our luck with netifaces if netifaces is not None: addresses = [] for interface in netifaces.interfaces(): if_addresses = netifaces.ifaddresses(interface) try: inet_list = if_addresses[netifaces.AF_INET] except KeyError: continue for data in inet_list: addresse = data["addr"] if self._filter_addresse(addresse): addresses.append(addresse) # then we use our connection to server ip = client.xmlstream.transport.getHost().host if self._filter_addresse(ip): self._insert_first(addresses, ip) return addresses # if server is local, we try with NAT-Port if self._nat is not None: nat_ip = await self._nat.get_ip(local=True) if nat_ip is not None: self._insert_first(addresses, nat_ip) return addresses if addresses: return addresses # still not luck, we need to contact external website allow_get_ip = await self._external_allowed(client) if not allow_get_ip: return addresses or localhost try: local_ip = await defer.ensureDeferred(self._get_ip_from_external(GET_IP_PAGE)) except (internet_error.DNSLookupError, internet_error.TimeoutError): log.warning("Can't access Domain Name System") else: if local_ip is not None: self._insert_first(addresses, local_ip) return addresses or localhost async def get_external_ip(self, client): """Try to discover external IP @return (deferred): external IP address or None if it can't be discovered """ if self._external_ip_cache is not None: return self._external_ip_cache # we first try with XEP-0279 ip_check = await self.host.hasFeature(client, NS_IP_CHECK) if ip_check: log.debug("Server IP Check available, we use it to retrieve our IP") iq_elt = client.IQ("get") iq_elt.addElement((NS_IP_CHECK, "address")) try: result_elt = await iq_elt.send() address_elt = next(result_elt.elements(NS_IP_CHECK, "address")) ip_elt = next(address_elt.elements(NS_IP_CHECK, "ip")) except StopIteration: log.warning( "Server returned invalid result on XEP-0279 request, we ignore it" ) except StanzaError as e: log.warning("error while requesting ip to server: {}".format(e)) else: # FIXME: server IP may not be the same as external IP (server can be on local machine or network) # IP should be checked to see if we have a local one, and rejected in this case external_ip = str(ip_elt) log.debug("External IP found: {}".format(external_ip)) self._external_ip_cache = external_ip return self._external_ip_cache # then with NAT-Port if self._nat is not None: nat_ip = await self._nat.get_ip() if nat_ip is not None: self._external_ip_cache = nat_ip return nat_ip # and finally by requesting external website allow_get_ip = await self._external_allowed(client) ip = None # FIXME: following code is deprecated, check it # try: # ip = ((await webclient.getPage(GET_IP_PAGE.encode('utf-8'))) # if allow_get_ip else None) # except (internet_error.DNSLookupError, internet_error.TimeoutError): # log.warning("Can't access Domain Name System") # ip = None # except web_error.Error as e: # log.warning( # "Error while retrieving IP on {url}: {message}".format( # url=GET_IP_PAGE, message=e # ) # ) # ip = None # else: # self._external_ip_cache = ip return ip @implementer(iwokkel.IDisco) class IPPlugin_handler(XMPPHandler): def getDiscoInfo(self, requestor, target, nodeIdentifier=""): return [disco.DiscoFeature(NS_IP_CHECK)] def getDiscoItems(self, requestor, target, nodeIdentifier=""): return []