comparison sat/plugins/plugin_misc_nat_port.py @ 3559:c8d980d694a7

plugin nat port: rename file to `plugin_misc_nat_port.py` to make the name valid
author Goffi <goffi@goffi.org>
date Wed, 09 Jun 2021 18:39:13 +0200
parents sat/plugins/plugin_misc_nat-port.py@be6d91572633
children 524856bd7b19
comparison
equal deleted inserted replaced
3558:d8581c83fef3 3559:c8d980d694a7
1 #!/usr/bin/env python3
2
3
4 # SAT plugin for NAT port mapping
5 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
6
7 # This program is free software: you can redistribute it and/or modify
8 # it under the terms of the GNU Affero General Public License as published by
9 # the Free Software Foundation, either version 3 of the License, or
10 # (at your option) any later version.
11
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU Affero General Public License for more details.
16
17 # You should have received a copy of the GNU Affero General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19
20 from sat.core.i18n import _
21 from sat.core.constants import Const as C
22 from sat.core.log import getLogger
23
24 log = getLogger(__name__)
25 from sat.core import exceptions
26 from twisted.internet import threads
27 from twisted.internet import defer
28 from twisted.python import failure
29 import threading
30
31 try:
32 import miniupnpc
33 except ImportError:
34 raise exceptions.MissingModule(
35 "Missing module MiniUPnPc, please download/install it (and its Python binding) at http://miniupnp.free.fr/ (or use pip install miniupnpc)"
36 )
37
38
39 PLUGIN_INFO = {
40 C.PI_NAME: "NAT port mapping",
41 C.PI_IMPORT_NAME: "NAT-PORT",
42 C.PI_TYPE: C.PLUG_TYPE_MISC,
43 C.PI_MAIN: "NatPort",
44 C.PI_HANDLER: "no",
45 C.PI_DESCRIPTION: _("""Automatic NAT port mapping using UPnP"""),
46 }
47
48 STARTING_PORT = 6000 # starting point to automatically find a port
49 DEFAULT_DESC = (
50 "SaT port mapping"
51 ) # we don't use "à" here as some bugged NAT don't manage charset correctly
52
53
54 class MappingError(Exception):
55 pass
56
57
58 class NatPort(object):
59 # TODO: refresh data if a new connection is detected (see plugin_misc_ip)
60
61 def __init__(self, host):
62 log.info(_("plugin NAT Port initialization"))
63 self.host = host
64 self._external_ip = None
65 self._initialised = defer.Deferred()
66 self._upnp = miniupnpc.UPnP() # will be None if no device is available
67 self._upnp.discoverdelay = 200
68 self._mutex = threading.Lock() # used to protect access to self._upnp
69 self._starting_port_cache = None # used to cache the first available port
70 self._to_unmap = [] # list of tuples (ext_port, protocol) of ports to unmap on unload
71 discover_d = threads.deferToThread(self._discover)
72 discover_d.chainDeferred(self._initialised)
73 self._initialised.addErrback(self._init_failed)
74
75 def unload(self):
76 if self._to_unmap:
77 log.info("Cleaning mapped ports")
78 return threads.deferToThread(self._unmapPortsBlocking)
79
80 def _init_failed(self, failure_):
81 e = failure_.trap(exceptions.NotFound, exceptions.FeatureNotFound)
82 if e == exceptions.FeatureNotFound:
83 log.info("UPnP-IGD seems to be not activated on the device")
84 else:
85 log.info("UPnP-IGD not available")
86 self._upnp = None
87
88 def _discover(self):
89 devices = self._upnp.discover()
90 if devices:
91 log.info("{nb} UPnP-IGD device(s) found".format(nb=devices))
92 else:
93 log.info("Can't find UPnP-IGD device on the local network")
94 raise failure.Failure(exceptions.NotFound())
95 self._upnp.selectigd()
96 try:
97 self._external_ip = self._upnp.externalipaddress()
98 except Exception:
99 raise failure.Failure(exceptions.FeatureNotFound())
100
101 def getIP(self, local=False):
102 """Return IP address found with UPnP-IGD
103
104 @param local(bool): True to get external IP address, False to get local network one
105 @return (None, str): found IP address, or None of something got wrong
106 """
107
108 def getIP(__):
109 if self._upnp is None:
110 return None
111 # lanaddr can be the empty string if not found,
112 # we need to return None in this case
113 return (self._upnp.lanaddr or None) if local else self._external_ip
114
115 return self._initialised.addCallback(getIP)
116
117 def _unmapPortsBlocking(self):
118 """Unmap ports mapped in this session"""
119 self._mutex.acquire()
120 try:
121 for port, protocol in self._to_unmap:
122 log.info("Unmapping port {}".format(port))
123 unmapping = self._upnp.deleteportmapping(
124 # the last parameter is remoteHost, we don't use it
125 port,
126 protocol,
127 "",
128 )
129
130 if not unmapping:
131 log.error(
132 "Can't unmap port {port} ({protocol})".format(
133 port=port, protocol=protocol
134 )
135 )
136 del self._to_unmap[:]
137 finally:
138 self._mutex.release()
139
140 def _mapPortBlocking(self, int_port, ext_port, protocol, desc):
141 """Internal blocking method to map port
142
143 @param int_port(int): internal port to use
144 @param ext_port(int): external port to use, or None to find one automatically
145 @param protocol(str): 'TCP' or 'UDP'
146 @param desc(str): description of the mapping
147 @param return(int, None): external port used in case of success, otherwise None
148 """
149 # we use mutex to avoid race condition if 2 threads
150 # try to acquire a port at the same time
151 self._mutex.acquire()
152 try:
153 if ext_port is None:
154 # find a free port
155 starting_port = self._starting_port_cache
156 ext_port = STARTING_PORT if starting_port is None else starting_port
157 ret = self._upnp.getspecificportmapping(ext_port, protocol)
158 while ret != None and ext_port < 65536:
159 ext_port += 1
160 ret = self._upnp.getspecificportmapping(ext_port, protocol)
161 if starting_port is None:
162 # XXX: we cache the first successfuly found external port
163 # to avoid testing again the first series the next time
164 self._starting_port_cache = ext_port
165
166 try:
167 mapping = self._upnp.addportmapping(
168 # the last parameter is remoteHost, we don't use it
169 ext_port,
170 protocol,
171 self._upnp.lanaddr,
172 int_port,
173 desc,
174 "",
175 )
176 except Exception as e:
177 log.error(_("addportmapping error: {msg}").format(msg=e))
178 raise failure.Failure(MappingError())
179
180 if not mapping:
181 raise failure.Failure(MappingError())
182 else:
183 self._to_unmap.append((ext_port, protocol))
184 finally:
185 self._mutex.release()
186
187 return ext_port
188
189 def mapPort(self, int_port, ext_port=None, protocol="TCP", desc=DEFAULT_DESC):
190 """Add a port mapping
191
192 @param int_port(int): internal port to use
193 @param ext_port(int,None): external port to use, or None to find one automatically
194 @param protocol(str): 'TCP' or 'UDP'
195 @param desc(unicode): description of the mapping
196 Some UPnP IGD devices have broken encoding. It's probably a good idea to avoid non-ascii chars here
197 @return (D(int, None)): external port used in case of success, otherwise None
198 """
199 if self._upnp is None:
200 return defer.succeed(None)
201
202 def mappingCb(ext_port):
203 log.info(
204 "{protocol} mapping from {int_port} to {ext_port} successful".format(
205 protocol=protocol, int_port=int_port, ext_port=ext_port
206 )
207 )
208 return ext_port
209
210 def mappingEb(failure_):
211 failure_.trap(MappingError)
212 log.warning("Can't map internal {int_port}".format(int_port=int_port))
213
214 def mappingUnknownEb(failure_):
215 log.error(_("error while trying to map ports: {msg}").format(msg=failure_))
216
217 d = threads.deferToThread(
218 self._mapPortBlocking, int_port, ext_port, protocol, desc
219 )
220 d.addCallbacks(mappingCb, mappingEb)
221 d.addErrback(mappingUnknownEb)
222 return d