Mercurial > libervia-backend
comparison sat/plugins/plugin_misc_ip.py @ 4037:524856bd7b19
massive refactoring to switch from camelCase to snake_case:
historically, Libervia (SàT before) was using camelCase as allowed by PEP8 when using a
pre-PEP8 code, to use the same coding style as in Twisted.
However, snake_case is more readable and it's better to follow PEP8 best practices, so it
has been decided to move on full snake_case. Because Libervia has a huge codebase, this
ended with a ugly mix of camelCase and snake_case.
To fix that, this patch does a big refactoring by renaming every function and method
(including bridge) that are not coming from Twisted or Wokkel, to use fully snake_case.
This is a massive change, and may result in some bugs.
author | Goffi <goffi@goffi.org> |
---|---|
date | Sat, 08 Apr 2023 13:54:42 +0200 |
parents | be6d91572633 |
children |
comparison
equal
deleted
inserted
replaced
4036:c4464d7ae97b | 4037:524856bd7b19 |
---|---|
97 # TODO: manage IPv6 when implemented in SàT | 97 # TODO: manage IPv6 when implemented in SàT |
98 | 98 |
99 def __init__(self, host): | 99 def __init__(self, host): |
100 log.info(_("plugin IP discovery initialization")) | 100 log.info(_("plugin IP discovery initialization")) |
101 self.host = host | 101 self.host = host |
102 host.memory.updateParams(PARAMS) | 102 host.memory.update_params(PARAMS) |
103 | 103 |
104 # NAT-Port | 104 # NAT-Port |
105 try: | 105 try: |
106 self._nat = host.plugins["NAT-PORT"] | 106 self._nat = host.plugins["NAT-PORT"] |
107 except KeyError: | 107 except KeyError: |
108 log.debug("NAT port plugin not available") | 108 log.debug("NAT port plugin not available") |
109 self._nat = None | 109 self._nat = None |
110 | 110 |
111 # XXX: cache is kept until SàT is restarted | 111 # XXX: cache is kept until SàT is restarted |
112 # if IP may have changed, use self.refreshIP | 112 # if IP may have changed, use self.refresh_ip |
113 self._external_ip_cache = None | 113 self._external_ip_cache = None |
114 self._local_ip_cache = None | 114 self._local_ip_cache = None |
115 | 115 |
116 def getHandler(self, client): | 116 def get_handler(self, client): |
117 return IPPlugin_handler() | 117 return IPPlugin_handler() |
118 | 118 |
119 def refreshIP(self): | 119 def refresh_ip(self): |
120 # FIXME: use a trigger instead ? | 120 # FIXME: use a trigger instead ? |
121 self._external_ip_cache = None | 121 self._external_ip_cache = None |
122 self._local_ip_cache = None | 122 self._local_ip_cache = None |
123 | 123 |
124 def _externalAllowed(self, client): | 124 def _external_allowed(self, client): |
125 """Return value of parameter with autorisation of user to do external requests | 125 """Return value of parameter with autorisation of user to do external requests |
126 | 126 |
127 if parameter is not set, a dialog is shown to use to get its confirmation, and parameted is set according to answer | 127 if parameter is not set, a dialog is shown to use to get its confirmation, and parameted is set according to answer |
128 @return (defer.Deferred[bool]): True if external request is autorised | 128 @return (defer.Deferred[bool]): True if external request is autorised |
129 """ | 129 """ |
130 allow_get_ip = self.host.memory.params.getParamA( | 130 allow_get_ip = self.host.memory.params.param_get_a( |
131 GET_IP_NAME, GET_IP_CATEGORY, use_default=False | 131 GET_IP_NAME, GET_IP_CATEGORY, use_default=False |
132 ) | 132 ) |
133 | 133 |
134 if allow_get_ip is None: | 134 if allow_get_ip is None: |
135 # we don't have autorisation from user yet to use get_ip, we ask him | 135 # we don't have autorisation from user yet to use get_ip, we ask him |
136 def setParam(allowed): | 136 def param_set(allowed): |
137 # FIXME: we need to use boolConst as setParam only manage str/unicode | 137 # FIXME: we need to use bool_const as param_set only manage str/unicode |
138 # need to be fixed when params will be refactored | 138 # need to be fixed when params will be refactored |
139 self.host.memory.setParam( | 139 self.host.memory.param_set( |
140 GET_IP_NAME, C.boolConst(allowed), GET_IP_CATEGORY | 140 GET_IP_NAME, C.bool_const(allowed), GET_IP_CATEGORY |
141 ) | 141 ) |
142 return allowed | 142 return allowed |
143 | 143 |
144 d = xml_tools.deferConfirm( | 144 d = xml_tools.defer_confirm( |
145 self.host, | 145 self.host, |
146 _(GET_IP_CONFIRM), | 146 _(GET_IP_CONFIRM), |
147 _(GET_IP_CONFIRM_TITLE), | 147 _(GET_IP_CONFIRM_TITLE), |
148 profile=client.profile, | 148 profile=client.profile, |
149 ) | 149 ) |
150 d.addCallback(setParam) | 150 d.addCallback(param_set) |
151 return d | 151 return d |
152 | 152 |
153 return defer.succeed(allow_get_ip) | 153 return defer.succeed(allow_get_ip) |
154 | 154 |
155 def _filterAddresse(self, ip_addr): | 155 def _filter_addresse(self, ip_addr): |
156 """Filter acceptable addresses | 156 """Filter acceptable addresses |
157 | 157 |
158 For now, just remove IPv4 local addresses | 158 For now, just remove IPv4 local addresses |
159 @param ip_addr(str): IP addresse | 159 @param ip_addr(str): IP addresse |
160 @return (bool): True if addresse is acceptable | 160 @return (bool): True if addresse is acceptable |
161 """ | 161 """ |
162 return not ip_addr.startswith("127.") | 162 return not ip_addr.startswith("127.") |
163 | 163 |
164 def _insertFirst(self, addresses, ip_addr): | 164 def _insert_first(self, addresses, ip_addr): |
165 """Insert ip_addr as first item in addresses | 165 """Insert ip_addr as first item in addresses |
166 | 166 |
167 @param addresses(list): list of IP addresses | 167 @param addresses(list): list of IP addresses |
168 @param ip_addr(str): IP addresse | 168 @param ip_addr(str): IP addresse |
169 """ | 169 """ |
172 addresses.remove(ip_addr) | 172 addresses.remove(ip_addr) |
173 addresses.insert(0, ip_addr) | 173 addresses.insert(0, ip_addr) |
174 else: | 174 else: |
175 addresses.insert(0, ip_addr) | 175 addresses.insert(0, ip_addr) |
176 | 176 |
177 async def _getIPFromExternal(self, ext_url): | 177 async def _get_ip_from_external(self, ext_url): |
178 """Get local IP by doing a connection on an external url | 178 """Get local IP by doing a connection on an external url |
179 | 179 |
180 @param ext_utl(str): url to connect to | 180 @param ext_utl(str): url to connect to |
181 @return (str, None): return local IP, or None if it's not possible | 181 @return (str, None): return local IP, or None if it's not possible |
182 """ | 182 """ |
199 local_ip = p.transport.getHost().host | 199 local_ip = p.transport.getHost().host |
200 p.transport.loseConnection() | 200 p.transport.loseConnection() |
201 return local_ip | 201 return local_ip |
202 | 202 |
203 @defer.inlineCallbacks | 203 @defer.inlineCallbacks |
204 def getLocalIPs(self, client): | 204 def get_local_i_ps(self, client): |
205 """Try do discover local area network IPs | 205 """Try do discover local area network IPs |
206 | 206 |
207 @return (deferred): list of lan IP addresses | 207 @return (deferred): list of lan IP addresses |
208 if there are several addresses, the one used with the server is put first | 208 if there are several addresses, the one used with the server is put first |
209 if no address is found, localhost IP will be in the list | 209 if no address is found, localhost IP will be in the list |
223 inet_list = if_addresses[netifaces.AF_INET] | 223 inet_list = if_addresses[netifaces.AF_INET] |
224 except KeyError: | 224 except KeyError: |
225 continue | 225 continue |
226 for data in inet_list: | 226 for data in inet_list: |
227 addresse = data["addr"] | 227 addresse = data["addr"] |
228 if self._filterAddresse(addresse): | 228 if self._filter_addresse(addresse): |
229 addresses.append(addresse) | 229 addresses.append(addresse) |
230 | 230 |
231 # then we use our connection to server | 231 # then we use our connection to server |
232 ip = client.xmlstream.transport.getHost().host | 232 ip = client.xmlstream.transport.getHost().host |
233 if self._filterAddresse(ip): | 233 if self._filter_addresse(ip): |
234 self._insertFirst(addresses, ip) | 234 self._insert_first(addresses, ip) |
235 defer.returnValue(addresses) | 235 defer.returnValue(addresses) |
236 | 236 |
237 # if server is local, we try with NAT-Port | 237 # if server is local, we try with NAT-Port |
238 if self._nat is not None: | 238 if self._nat is not None: |
239 nat_ip = yield self._nat.getIP(local=True) | 239 nat_ip = yield self._nat.get_ip(local=True) |
240 if nat_ip is not None: | 240 if nat_ip is not None: |
241 self._insertFirst(addresses, nat_ip) | 241 self._insert_first(addresses, nat_ip) |
242 defer.returnValue(addresses) | 242 defer.returnValue(addresses) |
243 | 243 |
244 if addresses: | 244 if addresses: |
245 defer.returnValue(addresses) | 245 defer.returnValue(addresses) |
246 | 246 |
247 # still not luck, we need to contact external website | 247 # still not luck, we need to contact external website |
248 allow_get_ip = yield self._externalAllowed(client) | 248 allow_get_ip = yield self._external_allowed(client) |
249 | 249 |
250 if not allow_get_ip: | 250 if not allow_get_ip: |
251 defer.returnValue(addresses or localhost) | 251 defer.returnValue(addresses or localhost) |
252 | 252 |
253 try: | 253 try: |
254 local_ip = yield defer.ensureDeferred(self._getIPFromExternal(GET_IP_PAGE)) | 254 local_ip = yield defer.ensureDeferred(self._get_ip_from_external(GET_IP_PAGE)) |
255 except (internet_error.DNSLookupError, internet_error.TimeoutError): | 255 except (internet_error.DNSLookupError, internet_error.TimeoutError): |
256 log.warning("Can't access Domain Name System") | 256 log.warning("Can't access Domain Name System") |
257 else: | 257 else: |
258 if local_ip is not None: | 258 if local_ip is not None: |
259 self._insertFirst(addresses, local_ip) | 259 self._insert_first(addresses, local_ip) |
260 | 260 |
261 defer.returnValue(addresses or localhost) | 261 defer.returnValue(addresses or localhost) |
262 | 262 |
263 @defer.inlineCallbacks | 263 @defer.inlineCallbacks |
264 def getExternalIP(self, client): | 264 def get_external_ip(self, client): |
265 """Try to discover external IP | 265 """Try to discover external IP |
266 | 266 |
267 @return (deferred): external IP address or None if it can't be discovered | 267 @return (deferred): external IP address or None if it can't be discovered |
268 """ | 268 """ |
269 if self._external_ip_cache is not None: | 269 if self._external_ip_cache is not None: |
293 self._external_ip_cache = external_ip | 293 self._external_ip_cache = external_ip |
294 defer.returnValue(self._external_ip_cache) | 294 defer.returnValue(self._external_ip_cache) |
295 | 295 |
296 # then with NAT-Port | 296 # then with NAT-Port |
297 if self._nat is not None: | 297 if self._nat is not None: |
298 nat_ip = yield self._nat.getIP() | 298 nat_ip = yield self._nat.get_ip() |
299 if nat_ip is not None: | 299 if nat_ip is not None: |
300 self._external_ip_cache = nat_ip | 300 self._external_ip_cache = nat_ip |
301 defer.returnValue(nat_ip) | 301 defer.returnValue(nat_ip) |
302 | 302 |
303 # and finally by requesting external website | 303 # and finally by requesting external website |
304 allow_get_ip = yield self._externalAllowed(client) | 304 allow_get_ip = yield self._external_allowed(client) |
305 try: | 305 try: |
306 ip = ((yield webclient.getPage(GET_IP_PAGE.encode('utf-8'))) | 306 ip = ((yield webclient.getPage(GET_IP_PAGE.encode('utf-8'))) |
307 if allow_get_ip else None) | 307 if allow_get_ip else None) |
308 except (internet_error.DNSLookupError, internet_error.TimeoutError): | 308 except (internet_error.DNSLookupError, internet_error.TimeoutError): |
309 log.warning("Can't access Domain Name System") | 309 log.warning("Can't access Domain Name System") |