Mercurial > libervia-backend
comparison src/plugins/plugin_xep_0065.py @ 1559:7cc29634b6ef
plugin XEP-0065, XEP-0096: preparation for plugin XEP-0260 implementation:
/!\ SI File Transfert (plugin XEP-0096) is temporarily broken
/!\ proxy handling is temporarily broken
plugin XEP-0096: use of Deferred for plugin XEP-0065 in the same way as for plugin XEP-0047
plugin XEP-0065:
- use of Deferred for sessions
- plugin IP is a dependency
- plugin NAT-PORT is used if available
- everything is now automatic, params are disabled for now (may be re-used in the future to force port or proxy)
- proxy infos are managed with a namedtuple
- connexion candidates are managed with a dedicate class
- priorities can be used for candidates, as needed for XEP-0260
- transfer can now be managed in both direction, with client or server
- socks5 server is launcher on demand, once for all profiles
- helper methods to try and find best candidate
- connection test and file transfer are done in 2 times
author | Goffi <goffi@goffi.org> |
---|---|
date | Mon, 02 Nov 2015 22:02:41 +0100 |
parents | 3265a2639182 |
children | 44854fb5d3b2 |
comparison
equal
deleted
inserted
replaced
1558:6a8dd91476f0 | 1559:7cc29634b6ef |
---|---|
21 # You should have received a copy of the GNU Affero General Public License | 21 # You should have received a copy of the GNU Affero General Public License |
22 # along with this program. If not, see <http://www.gnu.org/licenses/>. | 22 # along with this program. If not, see <http://www.gnu.org/licenses/>. |
23 | 23 |
24 # -- | 24 # -- |
25 | 25 |
26 # This program is based on proxy65 (http://code.google.com/p/proxy65), | 26 # This module is based on proxy65 (http://code.google.com/p/proxy65), |
27 # originaly written by David Smith and modified by Fabio Forno. | 27 # originaly written by David Smith and modified by Fabio Forno. |
28 # It is sublicensed under AGPL v3 (or any later version) as allowed by the original | 28 # It is sublicensed under AGPL v3 (or any later version) as allowed by the original |
29 # license. | 29 # license. |
30 | 30 |
31 # -- | 31 # -- |
55 # THE SOFTWARE. | 55 # THE SOFTWARE. |
56 | 56 |
57 from sat.core.i18n import _ | 57 from sat.core.i18n import _ |
58 from sat.core.log import getLogger | 58 from sat.core.log import getLogger |
59 log = getLogger(__name__) | 59 log = getLogger(__name__) |
60 from twisted.internet import protocol, reactor | 60 from sat.core.constants import Const as C |
61 from twisted.internet import error | 61 from sat.core import exceptions |
62 from sat.tools import sat_defer | |
63 from twisted.internet import protocol | |
64 from twisted.internet import reactor | |
65 from twisted.internet import error as internet_error | |
62 from twisted.words.protocols.jabber import jid, client as jabber_client | 66 from twisted.words.protocols.jabber import jid, client as jabber_client |
67 from twisted.words.protocols.jabber import error as jabber_error | |
63 from twisted.protocols.basic import FileSender | 68 from twisted.protocols.basic import FileSender |
64 from twisted.words.xish import domish | 69 from twisted.words.xish import domish |
65 from twisted.web.client import getPage | 70 from twisted.internet import defer |
71 from twisted.python import failure | |
66 from sat.core.exceptions import ProfileNotInCacheError | 72 from sat.core.exceptions import ProfileNotInCacheError |
73 from collections import namedtuple | |
67 import struct | 74 import struct |
68 import hashlib | 75 import hashlib |
76 import uuid | |
69 | 77 |
70 from zope.interface import implements | 78 from zope.interface import implements |
71 | 79 |
72 try: | 80 try: |
73 from twisted.words.protocols.xmlstream import XMPPHandler | 81 from twisted.words.protocols.xmlstream import XMPPHandler |
78 | 86 |
79 IQ_SET = '/iq[@type="set"]' | 87 IQ_SET = '/iq[@type="set"]' |
80 NS_BS = 'http://jabber.org/protocol/bytestreams' | 88 NS_BS = 'http://jabber.org/protocol/bytestreams' |
81 BS_REQUEST = IQ_SET + '/query[@xmlns="' + NS_BS + '"]' | 89 BS_REQUEST = IQ_SET + '/query[@xmlns="' + NS_BS + '"]' |
82 TIMEOUT = 60 # timeout for workflow | 90 TIMEOUT = 60 # timeout for workflow |
91 DEFER_KEY = 'finished' # key of the deferred used to track session end | |
92 SERVER_STARTING_PORT = 0 # starting number for server port search (0 to ask automatic attribution) | |
93 | |
94 # priorities are candidates local priorities, must be a int between 0 and 65535 | |
95 PRIORITY_BEST_DIRECT = 10000 | |
96 PRIORITY_DIRECT = 5000 | |
97 PRIORITY_ASSISTED = 1000 | |
98 PRIORITY_PROXY = 0.2 # proxy is the last option for s5b | |
99 CANDIDATE_DELAY = 0.2 # see XEP-0260 §4 | |
100 CANDIDATE_DELAY_PROXY = 0.2 # additional time for proxy types (see XEP-0260 §4 note 3) | |
83 | 101 |
84 PLUGIN_INFO = { | 102 PLUGIN_INFO = { |
85 "name": "XEP 0065 Plugin", | 103 "name": "XEP 0065 Plugin", |
86 "import_name": "XEP-0065", | 104 "import_name": "XEP-0065", |
87 "type": "XEP", | 105 "type": "XEP", |
88 "protocols": ["XEP-0065"], | 106 "protocols": ["XEP-0065"], |
107 "dependencies": ["IP"], | |
108 "recommendations": ["NAT-PORT"], | |
89 "main": "XEP_0065", | 109 "main": "XEP_0065", |
90 "handler": "yes", | 110 "handler": "yes", |
91 "description": _("""Implementation of SOCKS5 Bytestreams""") | 111 "description": _("""Implementation of SOCKS5 Bytestreams""") |
92 } | 112 } |
93 | 113 |
94 STATE_INITIAL = 0 | 114 # XXX: by default eveything is automatic |
95 STATE_AUTH = 1 | 115 # TODO: use these params to force use of specific proxy/port/IP |
96 STATE_REQUEST = 2 | 116 # PARAMS = """ |
97 STATE_READY = 3 | 117 # <params> |
98 STATE_AUTH_USERPASS = 4 | 118 # <general> |
99 STATE_TARGET_INITIAL = 5 | 119 # <category name="File Transfer"> |
100 STATE_TARGET_AUTH = 6 | 120 # <param name="Force IP" type="string" /> |
101 STATE_TARGET_REQUEST = 7 | 121 # <param name="Force Port" type="int" constraint="1;65535" /> |
102 STATE_TARGET_READY = 8 | 122 # </category> |
103 STATE_LAST = 9 | 123 # </general> |
104 | 124 # <individual> |
105 STATE_CONNECT_PENDING = STATE_LAST + 1 | 125 # <category name="File Transfer"> |
126 # <param name="Force Proxy" value="" type="string" /> | |
127 # <param name="Force Proxy host" value="" type="string" /> | |
128 # <param name="Force Proxy port" value="" type="int" constraint="1;65535" /> | |
129 # </category> | |
130 # </individual> | |
131 # </params> | |
132 # """ | |
133 | |
134 (STATE_INITIAL, | |
135 STATE_AUTH, | |
136 STATE_REQUEST, | |
137 STATE_READY, | |
138 STATE_AUTH_USERPASS, | |
139 STATE_CLIENT_INITIAL, | |
140 STATE_CLIENT_AUTH, | |
141 STATE_CLIENT_REQUEST, | |
142 ) = xrange(8) | |
106 | 143 |
107 SOCKS5_VER = 0x05 | 144 SOCKS5_VER = 0x05 |
108 | 145 |
109 ADDR_IPV4 = 0x01 | 146 ADDR_IPV4 = 0x01 |
110 ADDR_DOMAINNAME = 0x03 | 147 ADDR_DOMAINNAME = 0x03 |
127 REPLY_TTL_EXPIRED = 0x06 | 164 REPLY_TTL_EXPIRED = 0x06 |
128 REPLY_CMD_NOT_SUPPORTED = 0x07 | 165 REPLY_CMD_NOT_SUPPORTED = 0x07 |
129 REPLY_ADDR_NOT_SUPPORTED = 0x08 | 166 REPLY_ADDR_NOT_SUPPORTED = 0x08 |
130 | 167 |
131 | 168 |
132 def calculateHash(from_jid, to_jid, sid): | 169 ProxyInfos = namedtuple("ProxyInfos", ['host', 'jid', 'port']) |
133 """Calculate SHA1 Hash according to XEP-0065 | 170 |
134 @param from_jid: jid of the requester | 171 |
135 @param to_jid: jid of the target | 172 class Candidate(object): |
136 @param sid: session id | 173 |
137 @return: hash (string)""" | 174 def __init__(self, host, port, type_, priority, jid_, id_=None, priority_local=False, factory=None): |
175 """ | |
176 @param host(unicode): host IP or domain | |
177 @param port(int): port | |
178 @param type_(unicode): stream type (one of XEP_0065.TYPE_*) | |
179 @param priority(int): priority | |
180 @param jid_(jid.JID): jid | |
181 @param id_(None, id_): Candidate ID, or None to generate | |
182 @param priority_local(bool): if True, priority is used as local priority, | |
183 else priority is used as global one (and local priority is set to 0) | |
184 """ | |
185 assert isinstance(jid_, jid.JID) | |
186 self.host, self.port, self.type, self.jid = ( | |
187 host, int(port), type_, jid_) | |
188 self.id = id_ if id_ is not None else unicode(uuid.uuid4()) | |
189 if priority_local: | |
190 self._local_priority = int(priority) | |
191 self._priority = self.calculatePriority() | |
192 else: | |
193 self._local_priority = 0 | |
194 self._priority = int(priority) | |
195 self.factory = factory | |
196 | |
197 def discard(self): | |
198 """Disconnect a candidate if it is connected | |
199 | |
200 Used to disconnect tryed client when they are discarded | |
201 """ | |
202 log.debug(u"Discarding {}".format(self)) | |
203 try: | |
204 self.factory.discard() | |
205 except AttributeError: | |
206 pass # no discard for Socks5ServerFactory | |
207 | |
208 @property | |
209 def local_priority(self): | |
210 return self._local_priority | |
211 | |
212 @property | |
213 def priority(self): | |
214 return self._priority | |
215 | |
216 def __str__(self): | |
217 # similar to __unicode__ but we don't show jid and we encode id | |
218 return "Candidate ({0.priority}): host={0.host} port={0.port} type={0.type}{id}".format( | |
219 self, | |
220 id=u" id={}".format(self.id if self.id is not None else u'').encode('utf-8', 'ignore'), | |
221 ) | |
222 | |
223 def __unicode__(self): | |
224 return u"Candidate ({0.priority}): host={0.host} port={0.port} jid={0.jid} type={0.type}{id}".format( | |
225 self, | |
226 id=u" id={}".format(self.id if self.id is not None else u''), | |
227 ) | |
228 | |
229 def __eq__(self, other): | |
230 # self.id is is not used in __eq__ as the same candidate can have | |
231 # different ids if proposed by initiator or responder | |
232 try: | |
233 return (self.host == other.host and | |
234 self.port == other.port and | |
235 self.jid == other.jid) | |
236 except (AttributeError, TypeError): | |
237 return False | |
238 | |
239 def __ne__(self, other): | |
240 return not self.__eq__(other) | |
241 | |
242 def calculatePriority(self): | |
243 """Calculate candidate priority according to XEP-0260 §2.2 | |
244 | |
245 | |
246 @return (int): priority | |
247 """ | |
248 if self.type == XEP_0065.TYPE_DIRECT: | |
249 multiplier = 126 | |
250 elif self.type == XEP_0065.TYPE_ASSISTED: | |
251 multiplier = 120 | |
252 elif self.type == XEP_0065.TYPE_TUNEL: | |
253 multiplier = 110 | |
254 elif self.type == XEP_0065.TYPE_PROXY: | |
255 multiplier = 10 | |
256 else: | |
257 raise exceptions.InternalError(u"Unknown {} type !".format(self.type)) | |
258 return 2**16 * multiplier + self._local_priority | |
259 | |
260 def startTransfer(self, session_hash=None): | |
261 self.factory.startTransfer(session_hash) | |
262 | |
263 | |
264 def getSessionHash(from_jid, to_jid, sid): | |
265 """Calculate SHA1 Hash according to XEP-0065 §5.3.2 | |
266 | |
267 @param from_jid(jid.JID): jid of the requester | |
268 @param to_jid(jid.JID): jid of the target | |
269 @param sid(unicode): session id | |
270 @return (str): hash | |
271 """ | |
138 return hashlib.sha1((sid + from_jid.full() + to_jid.full()).encode('utf-8')).hexdigest() | 272 return hashlib.sha1((sid + from_jid.full() + to_jid.full()).encode('utf-8')).hexdigest() |
139 | 273 |
140 | 274 |
141 class SOCKSv5(protocol.Protocol, FileSender): | 275 class SOCKSv5(protocol.Protocol, FileSender): |
142 def __init__(self): | 276 |
277 def __init__(self, session_hash=None): | |
278 """ | |
279 @param session_hash(str): hash of the session | |
280 must only be used in client mode | |
281 """ | |
143 log.debug(_("Protocol init")) | 282 log.debug(_("Protocol init")) |
144 self.state = STATE_INITIAL | 283 self.connection = defer.Deferred() # called when connection/auth is done |
284 if session_hash is not None: | |
285 self.server_mode = False | |
286 self._session_hash = session_hash | |
287 self.state = STATE_CLIENT_INITIAL | |
288 else: | |
289 self.server_mode = True | |
290 self.state = STATE_INITIAL | |
145 self.buf = "" | 291 self.buf = "" |
146 self.supportedAuthMechs = [AUTHMECH_ANON] | 292 self.supportedAuthMechs = [AUTHMECH_ANON] |
147 self.supportedAddrs = [ADDR_DOMAINNAME] | 293 self.supportedAddrs = [ADDR_DOMAINNAME] |
148 self.enabledCommands = [CMD_CONNECT] | 294 self.enabledCommands = [CMD_CONNECT] |
149 self.peersock = None | 295 self.peersock = None |
150 self.addressType = 0 | 296 self.addressType = 0 |
151 self.requestType = 0 | 297 self.requestType = 0 |
298 self._file_obj = None | |
299 | |
300 @property | |
301 def file_obj(self): | |
302 if self._file_obj is None: | |
303 if self.server_mode: | |
304 self._file_obj = self.factory.getSession(self._session_hash)["file"] | |
305 else: | |
306 self._file_obj = self.factory.getSession()['file'] | |
307 return self._file_obj | |
152 | 308 |
153 def _startNegotiation(self): | 309 def _startNegotiation(self): |
154 log.debug("_startNegotiation") | 310 log.debug("starting negotiation (client mode)") |
155 self.state = STATE_TARGET_AUTH | 311 self.state = STATE_CLIENT_AUTH |
156 self.transport.write(struct.pack('!3B', SOCKS5_VER, 1, AUTHMECH_ANON)) | 312 self.transport.write(struct.pack('!3B', SOCKS5_VER, 1, AUTHMECH_ANON)) |
157 | 313 |
158 def _parseNegotiation(self): | 314 def _parseNegotiation(self): |
159 log.debug("_parseNegotiation") | 315 log.debug("_parseNegotiation") |
160 try: | 316 try: |
182 # Complete negotiation w/ this method | 338 # Complete negotiation w/ this method |
183 self.transport.write(struct.pack('!BB', SOCKS5_VER, m)) | 339 self.transport.write(struct.pack('!BB', SOCKS5_VER, m)) |
184 return | 340 return |
185 | 341 |
186 # No supported mechs found, notify client and close the connection | 342 # No supported mechs found, notify client and close the connection |
343 log.warning(u"Unsupported authentication mechanism") | |
187 self.transport.write(struct.pack('!BB', SOCKS5_VER, AUTHMECH_INVALID)) | 344 self.transport.write(struct.pack('!BB', SOCKS5_VER, AUTHMECH_INVALID)) |
188 self.transport.loseConnection() | 345 self.transport.loseConnection() |
189 except struct.error: | 346 except struct.error: |
190 pass | 347 pass |
191 | 348 |
256 else: | 413 else: |
257 # Any other command is not supported | 414 # Any other command is not supported |
258 self.sendErrorReply(REPLY_CMD_NOT_SUPPORTED) | 415 self.sendErrorReply(REPLY_CMD_NOT_SUPPORTED) |
259 | 416 |
260 except struct.error: | 417 except struct.error: |
418 # The buffer is probably not complete, we need to wait more | |
261 return None | 419 return None |
262 | 420 |
263 def _makeRequest(self): | 421 def _makeRequest(self): |
264 log.debug("_makeRequest") | 422 log.debug("_makeRequest") |
265 self.state = STATE_TARGET_REQUEST | 423 # sha1 = getSessionHash(self.data["from"], self.data["to"], self.sid) |
266 sha1 = calculateHash(self.data["from"], self.data["to"], self.sid) | 424 hash_ = self._session_hash |
267 request = struct.pack('!5B%dsH' % len(sha1), SOCKS5_VER, CMD_CONNECT, 0, ADDR_DOMAINNAME, len(sha1), sha1, 0) | 425 request = struct.pack('!5B%dsH' % len(hash_), SOCKS5_VER, CMD_CONNECT, 0, ADDR_DOMAINNAME, len(hash_), hash_, 0) |
268 self.transport.write(request) | 426 self.transport.write(request) |
427 self.state = STATE_CLIENT_REQUEST | |
269 | 428 |
270 def _parseRequestReply(self): | 429 def _parseRequestReply(self): |
271 log.debug("_parseRequestReply") | 430 log.debug("_parseRequestReply") |
272 try: | 431 try: |
273 ver, rep, rsvd, self.addressType = struct.unpack('!BBBB', self.buf[:4]) | 432 ver, rep, rsvd, self.addressType = struct.unpack('!BBBB', self.buf[:4]) |
292 # Ensure reply is OK | 451 # Ensure reply is OK |
293 if rep != REPLY_SUCCESS: | 452 if rep != REPLY_SUCCESS: |
294 self.loseConnection() | 453 self.loseConnection() |
295 return | 454 return |
296 | 455 |
297 if self.factory.proxy: | 456 # if self.factory.proxy: |
298 self.state = STATE_READY | 457 # self.state = STATE_READY |
299 self.factory.activateCb(self.sid, self.factory.iq_id, self.startTransfer, self.profile) | 458 # self.factory.activateCb(self.sid, self.factory.iq_id, self.startTransfer, self.profile) |
300 else: | 459 # else: |
301 self.state = STATE_TARGET_READY | 460 self.state = STATE_READY |
302 self.factory.activateCb(self.sid, self.factory.iq_id, self.profile) | 461 self.connection.callback(None) |
462 # self.factory.activateCb(self.sid, self.factory.iq_id, self.profile) | |
303 | 463 |
304 except struct.error: | 464 except struct.error: |
465 # The buffer is probably not complete, we need to wait more | |
305 return None | 466 return None |
306 | 467 |
307 def connectionMade(self): | 468 def connectionMade(self): |
308 log.debug(u"connectionMade (mode = %s)" % "requester" if isinstance(self.factory, Socks5ServerFactory) else "target") | 469 log.debug(u"Socks5 connectionMade (mode = {})".format("server" if self.state == STATE_INITIAL else "client")) |
309 | 470 if self.state == STATE_CLIENT_INITIAL: |
310 if isinstance(self.factory, Socks5ClientFactory): | |
311 self.sid = self.factory.sid | |
312 self.profile = self.factory.profile | |
313 self.data = self.factory.data | |
314 self.state = STATE_TARGET_INITIAL | |
315 self._startNegotiation() | 471 self._startNegotiation() |
316 | 472 |
317 def connectRequested(self, addr, port): | 473 def connectRequested(self, addr, port): |
318 log.debug("connectRequested") | 474 log.debug("connectRequested") |
319 | 475 |
320 # Check that this session is expected | 476 # Check that this session is expected |
321 if addr not in self.factory.hash_sid_map: | 477 if not self.factory.addToSession(addr, self): |
322 #no: we refuse it | |
323 self.sendErrorReply(REPLY_CONN_REFUSED) | 478 self.sendErrorReply(REPLY_CONN_REFUSED) |
479 log.warning(u"Unexpected connection request received from {host}" | |
480 .format(host=self.transport.getPeer().host)) | |
324 return | 481 return |
325 self.sid, self.profile = self.factory.hash_sid_map[addr] | 482 self._session_hash = addr |
326 client = self.factory.host.getClient(self.profile) | 483 # self.sid, self.profile = self.factory.hash_profiles_map[addr] |
327 client.xep_0065_current_stream[self.sid]["start_transfer_cb"] = self.startTransfer | 484 # client = self.factory.host.getClient(self.profile) |
485 # client.xep_0065_current_stream[self.sid]["start_transfer_cb"] = self.startTransfer | |
328 self.connectCompleted(addr, 0) | 486 self.connectCompleted(addr, 0) |
329 self.transport.stopReading() | 487 |
330 | 488 def startTransfer(self): |
331 def startTransfer(self, file_obj): | |
332 """Callback called when the result iq is received""" | 489 """Callback called when the result iq is received""" |
333 d = self.beginFileTransfer(file_obj, self.transport) | 490 log.debug(u"Starting file transfer") |
491 d = self.beginFileTransfer(self.file_obj, self.transport) | |
334 d.addCallback(self.fileTransfered) | 492 d.addCallback(self.fileTransfered) |
335 | 493 |
336 def fileTransfered(self, d): | 494 def fileTransfered(self, d): |
337 log.info(_("File transfer completed, closing connection")) | 495 log.info(_("File transfer completed, closing connection")) |
338 self.transport.loseConnection() | 496 self.transport.loseConnection() |
339 self.factory.finishedCb(self.sid, True, self.profile) | |
340 | 497 |
341 def connectCompleted(self, remotehost, remoteport): | 498 def connectCompleted(self, remotehost, remoteport): |
342 log.debug("connectCompleted") | 499 log.debug("connectCompleted") |
343 if self.addressType == ADDR_IPV4: | 500 if self.addressType == ADDR_IPV4: |
344 result = struct.pack('!BBBBIH', SOCKS5_VER, REPLY_SUCCESS, 0, 1, remotehost, remoteport) | 501 result = struct.pack('!BBBBIH', SOCKS5_VER, REPLY_SUCCESS, 0, 1, remotehost, remoteport) |
355 # FIXME: implement authentication and remove the debug printing a password | 512 # FIXME: implement authentication and remove the debug printing a password |
356 log.debug(u"User/pass: %s/%s" % (user, passwd)) | 513 log.debug(u"User/pass: %s/%s" % (user, passwd)) |
357 return True | 514 return True |
358 | 515 |
359 def dataReceived(self, buf): | 516 def dataReceived(self, buf): |
360 if self.state == STATE_TARGET_READY: | 517 if self.state == STATE_READY: |
361 self.data["file_obj"].write(buf) | 518 # Everything is set, we just have to write the incoming data |
519 self.file_obj.write(buf) | |
362 return | 520 return |
363 | 521 |
364 self.buf = self.buf + buf | 522 self.buf = self.buf + buf |
365 if self.state == STATE_INITIAL: | 523 if self.state == STATE_INITIAL: |
366 self._parseNegotiation() | 524 self._parseNegotiation() |
367 if self.state == STATE_AUTH_USERPASS: | 525 if self.state == STATE_AUTH_USERPASS: |
368 self._parseUserPass() | 526 self._parseUserPass() |
369 if self.state == STATE_REQUEST: | 527 if self.state == STATE_REQUEST: |
370 self._parseRequest() | 528 self._parseRequest() |
371 if self.state == STATE_TARGET_AUTH: | 529 if self.state == STATE_CLIENT_REQUEST: |
530 self._parseRequestReply() | |
531 if self.state == STATE_CLIENT_AUTH: | |
372 ver, method = struct.unpack('!BB', buf) | 532 ver, method = struct.unpack('!BB', buf) |
373 self.buf = self.buf[2:] | 533 self.buf = self.buf[2:] |
374 if ver != SOCKS5_VER or method != AUTHMECH_ANON: | 534 if ver != SOCKS5_VER or method != AUTHMECH_ANON: |
375 self.transport.loseConnection() | 535 self.transport.loseConnection() |
376 else: | 536 else: |
377 self._makeRequest() | 537 self._makeRequest() |
378 if self.state == STATE_TARGET_REQUEST: | |
379 self._parseRequestReply() | |
380 | |
381 def clientConnectionLost(self, reason): | |
382 log.debug("clientConnectionLost") | |
383 self.transport.loseConnection() | |
384 | 538 |
385 def connectionLost(self, reason): | 539 def connectionLost(self, reason): |
386 log.debug("connectionLost") | 540 log.debug(u"Socks5 connection lost: {}".format(reason.value)) |
387 if self.state != STATE_CONNECT_PENDING: | 541 # self.transport.unregisterProducer() |
388 self.transport.unregisterProducer() | 542 # if self.peersock is not None: |
389 if self.peersock is not None: | 543 # self.peersock.peersock = None |
390 self.peersock.peersock = None | 544 # self.peersock.transport.unregisterProducer() |
391 self.peersock.transport.unregisterProducer() | 545 # self.peersock = None |
392 self.peersock = None | 546 if self.state != STATE_READY: |
547 self.connection.errback(reason) | |
548 if self.server_mode : | |
549 self.factory.removeFromSession(self._session_hash, self, reason) | |
393 | 550 |
394 | 551 |
395 class Socks5ServerFactory(protocol.ServerFactory): | 552 class Socks5ServerFactory(protocol.ServerFactory): |
396 protocol = SOCKSv5 | 553 protocol = SOCKSv5 |
397 | 554 |
398 def __init__(self, host, hash_sid_map, finishedCb): | 555 def __init__(self, parent): |
399 self.host = host | 556 """ |
400 self.hash_sid_map = hash_sid_map | 557 @param parent(XEP_0065): XEP_0065 parent instance |
401 self.finishedCb = finishedCb | 558 """ |
402 | 559 self.parent = parent |
403 def startedConnecting(self, connector): | 560 |
404 log.debug(_("Socks 5 server connection started")) | 561 def getSession(self, session_hash): |
405 | 562 return self.parent.getSession(session_hash, None) |
406 def clientConnectionLost(self, connector, reason): | 563 |
407 log.debug(_(u"Socks 5 server connection lost (reason: %s)") % reason) | 564 def startTransfer(self, session_hash): |
565 session = self.getSession(session_hash) | |
566 try: | |
567 protocol = session['protocols'][0] | |
568 except (KeyError, IndexError): | |
569 log.error(u"Can't start file transfer, can't find protocol") | |
570 else: | |
571 protocol.startTransfer() | |
572 | |
573 def addToSession(self, session_hash, protocol): | |
574 """Check is session_hash is valid, and associate protocol with it | |
575 | |
576 the session will be associated to the corresponding candidate | |
577 @param session_hash(str): hash of the session | |
578 @param protocol(SOCKSv5): protocol instance | |
579 @param return(bool): True if hash was valid (i.e. expected), False else | |
580 """ | |
581 try: | |
582 session_data = self.getSession(session_hash) | |
583 except KeyError: | |
584 return False | |
585 else: | |
586 session_data.setdefault('protocols', []).append(protocol) | |
587 return True | |
588 | |
589 def removeFromSession(self, session_hash, protocol, reason): | |
590 """Remove a protocol from session_data | |
591 | |
592 There can be several protocol instances while candidates are tried, they | |
593 have removed when candidate connection is closed | |
594 @param session_hash(str): hash of the session | |
595 @param protocol(SOCKSv5): protocol instance | |
596 @param reason(failure.Failure): reason of the removal | |
597 """ | |
598 try: | |
599 protocols = self.getSession(session_hash)['protocols'] | |
600 protocols.remove(protocol) | |
601 except (KeyError, ValueError): | |
602 log.error(u"Protocol not found in session while it should be there") | |
603 else: | |
604 if not protocols: | |
605 # The last protocol has been removed, session is finished | |
606 if reason.check(internet_error.ConnectionDone): | |
607 self.getSession(session_hash)[DEFER_KEY].callback(None) | |
608 else: | |
609 self.getSession(session_hash)[DEFER_KEY].errback(reason) | |
408 | 610 |
409 | 611 |
410 class Socks5ClientFactory(protocol.ClientFactory): | 612 class Socks5ClientFactory(protocol.ClientFactory): |
411 protocol = SOCKSv5 | 613 protocol = SOCKSv5 |
412 | 614 |
413 def __init__(self, current_stream, sid, iq_id, activateCb, finishedCb, proxy=False, profile=None): | 615 # def __init__(self, stream_data, sid, iq_id, activateCb, finishedCb, proxy=False, profile=C.PROF_KEY_NONE): |
616 def __init__(self, parent, session_hash, profile): | |
414 """Init the Client Factory | 617 """Init the Client Factory |
415 @param current_stream: current streams data | 618 |
416 @param sid: Session ID | 619 @param session_hash(unicode): hash of the session |
417 @param iq_id: iq id used to initiate the stream | 620 hash is the same as hostname computer in XEP-0065 § 5.3.2 #1 |
418 @param activateCb: method to call to activate the stream | 621 @param profile(unciode): %(doc_profile)s |
419 @param finishedCb: method to call when the stream session is finished | 622 """ |
420 @param proxy: True if we are connecting throught a proxy (and we are a requester) | 623 self.session = parent.getSession(session_hash, profile) |
421 @param profile: %(doc_profile)s""" | 624 self.session_hash = session_hash |
422 assert(profile) | |
423 self.data = current_stream[sid] | |
424 self.sid = sid | |
425 self.iq_id = iq_id | |
426 self.activateCb = activateCb | |
427 self.finishedCb = finishedCb | |
428 self.proxy = proxy | |
429 self.profile = profile | 625 self.profile = profile |
430 | 626 self.connection = defer.Deferred() |
431 def startedConnecting(self, connector): | 627 self._protocol_instance = None |
432 log.debug(_("Socks 5 client connection started")) | 628 self.connector = None |
629 self._discarded = False | |
630 # self.data = stream_data[sid] | |
631 # self.sid = sid | |
632 # self.iq_id = iq_id | |
633 # self.activateCb = activateCb | |
634 # self.finishedCb = finishedCb | |
635 # self.proxy = proxy | |
636 # self.profile = profile | |
637 | |
638 def discard(self): | |
639 """Disconnect the client | |
640 | |
641 Also set a discarded flag, which avoid to call the session Deferred | |
642 """ | |
643 self.connector.disconnect() | |
644 self._discarded = True | |
645 | |
646 def getSession(self): | |
647 return self.session | |
648 | |
649 def startTransfer(self, dummy=None): | |
650 self._protocol_instance.startTransfer() | |
651 | |
652 def clientConnectionFailed(self, connector, reason): | |
653 log.debug(u"Connection failed") | |
654 self.connection.errback(reason) | |
433 | 655 |
434 def clientConnectionLost(self, connector, reason): | 656 def clientConnectionLost(self, connector, reason): |
435 log.debug(_(u"Socks 5 client connection lost (reason: %s)") % reason) | 657 log.debug(_(u"Socks 5 client connection lost (reason: %s)") % reason.value) |
436 self.finishedCb(self.sid, reason.type == error.ConnectionDone, self.profile) # TODO: really check if the state is actually successful | 658 self._protocol_instance = None |
659 if not self._discarded: | |
660 # This one was used for the transfer, than mean that | |
661 # the Socks5 session is finished | |
662 if reason.check(internet_error.ConnectionDone): | |
663 self.getSession()[DEFER_KEY].callback(None) | |
664 else: | |
665 self.getSession()[DEFER_KEY].errback(reason) | |
666 # self.finishedCb(self.sid, reason.type == internet_error.ConnectionDone, self.profile) # TODO: really check if the state is actually successful | |
667 | |
668 def buildProtocol(self, addr): | |
669 log.debug(("Socks 5 client connection started")) | |
670 p = self.protocol(session_hash=self.session_hash) | |
671 p.factory = self | |
672 p.connection.chainDeferred(self.connection) | |
673 self._protocol_instance = p | |
674 return p | |
437 | 675 |
438 | 676 |
439 class XEP_0065(object): | 677 class XEP_0065(object): |
440 | |
441 NAMESPACE = NS_BS | 678 NAMESPACE = NS_BS |
442 | 679 TYPE_DIRECT = 'direct' |
443 params = """ | 680 TYPE_ASSISTED = 'assisted' |
444 <params> | 681 TYPE_TUNEL = 'tunel' |
445 <general> | 682 TYPE_PROXY = 'proxy' |
446 <category name="File Transfer"> | 683 Candidate = Candidate |
447 <param name="IP" value='0.0.0.0' default_cb='yes' type="string" /> | |
448 <param name="Port" value="28915" type="int" constraint="1;65535" /> | |
449 </category> | |
450 </general> | |
451 <individual> | |
452 <category name="File Transfer"> | |
453 <param name="Proxy" value="" type="string" /> | |
454 <param name="Proxy host" value="" type="string" /> | |
455 <param name="Proxy port" value="" type="int" constraint="1;65535" /> | |
456 </category> | |
457 </individual> | |
458 </params> | |
459 """ | |
460 | 684 |
461 def __init__(self, host): | 685 def __init__(self, host): |
462 log.info(_("Plugin XEP_0065 initialization")) | 686 log.info(_("Plugin XEP_0065 initialization")) |
463 | |
464 #session data | |
465 self.hash_sid_map = {} # key: hash of the transfer session, value: (session id, profile) | |
466 | |
467 self.host = host | 687 self.host = host |
468 log.debug(_("registering")) | 688 |
469 self.server_factory = Socks5ServerFactory(host, self.hash_sid_map, lambda sid, success, profile: self._killId(sid, success, profile=profile)) | 689 # session data |
470 | 690 self.hash_profiles_map = {} # key: hash of the transfer session, value: session data |
471 #parameters | 691 self._cache_proxies = {} # key: server jid, value: proxy data |
472 host.memory.updateParams(XEP_0065.params) | 692 |
473 host.memory.setDefault("IP", "File Transfer", self.getExternalIP) | 693 # misc data |
474 port = int(self.host.memory.getParamA("Port", "File Transfer")) | 694 self._server_factory = None |
475 | 695 self._external_port = None |
476 log.info(_("Launching Socks5 Stream server on port %d") % port) | 696 |
477 reactor.listenTCP(port, self.server_factory) | 697 # plugins shortcuts |
698 self._ip = self.host.plugins['IP'] | |
699 try: | |
700 self._np = self.host.plugins['NAT-PORT'] | |
701 except KeyError: | |
702 log.debug(u"NAT Port plugin not available") | |
703 self._np = None | |
704 | |
705 # parameters | |
706 # XXX: params are not used for now, but they may be used in the futur to force proxy/IP | |
707 # host.memory.updateParams(PARAMS) | |
478 | 708 |
479 def getHandler(self, profile): | 709 def getHandler(self, profile): |
480 return XEP_0065_handler(self) | 710 return XEP_0065_handler(self) |
481 | 711 |
482 def profileConnected(self, profile): | 712 def profileConnected(self, profile): |
483 client = self.host.getClient(profile) | 713 client = self.host.getClient(profile) |
484 client.xep_0065_current_stream = {} # key: stream_id, value: data(dict) | 714 client.xep_0065_current_stream = {} # key: stream_id, value: session_data(dict) |
485 | 715 client._s5b_sessions = {} |
486 def getExternalIP(self): | 716 |
487 """Return IP visible from outside, by asking to a website""" | 717 def getSessionHash(self, from_jid, to_jid, sid): |
488 return getPage("http://www.goffi.org/sat_tools/get_ip.php") | 718 return getSessionHash(from_jid, to_jid, sid) |
489 | 719 |
490 def getProgress(self, sid, data, profile): | 720 def getSocks5ServerFactory(self): |
491 """Fill data with position of current transfer""" | 721 """Return server factory |
722 | |
723 The server is created if it doesn't exists yet | |
724 self._server_factory_port is set on server creation | |
725 """ | |
726 | |
727 if self._server_factory is None: | |
728 # self._server_factory = Socks5ServerFactory(self.host, self.hash_profiles_map, lambda sid, client: self._killSession(sid, client)) | |
729 self._server_factory = Socks5ServerFactory(self) | |
730 for port in xrange(SERVER_STARTING_PORT, 65356): | |
731 try: | |
732 listening_port = reactor.listenTCP(port, self._server_factory) | |
733 except internet_error.CannotListenError as e: | |
734 log.debug(u"Cannot listen on port {port}: {err_msg}{err_num}".format( | |
735 port=port, | |
736 err_msg=e.socketError.strerror, | |
737 err_num=u' (error code: {})'.format(e.socketError.errno), | |
738 )) | |
739 else: | |
740 self._server_factory_port = listening_port.getHost().port | |
741 break | |
742 | |
743 log.info(_("Socks5 Stream server launched on port {}").format(self._server_factory_port)) | |
744 return self._server_factory | |
745 | |
746 @defer.inlineCallbacks | |
747 def getProxy(self, profile): | |
748 """Return the proxy available for this profile | |
749 | |
750 cache is used between profiles using the same server | |
751 @param profile: %(doc_profile)s | |
752 @return ((D)(ProxyInfos, None)): Found proxy infos, | |
753 or None if not acceptable proxy is found | |
754 """ | |
755 def notFound(server): | |
756 log.info(u"No proxy found on this server") | |
757 self._cache_proxies[server] = None | |
758 defer.returnValue(None) | |
492 client = self.host.getClient(profile) | 759 client = self.host.getClient(profile) |
493 try: | 760 server = client.jid.host |
494 file_obj = client.xep_0065_current_stream[sid]["file_obj"] | 761 try: |
495 data["position"] = str(file_obj.tell()) | 762 defer.returnValue(self._cache_proxies[server]) |
496 data["size"] = str(client.xep_0065_current_stream[sid]["size"]) | 763 except KeyError: |
497 except: | |
498 pass | 764 pass |
499 | 765 try: |
500 def _timeOut(self, sid, profile): | 766 proxy = (yield self.host.findServiceEntities('proxy', 'bytestreams', profile=profile)).pop() |
767 except (exceptions.CancelError, StopIteration): | |
768 notFound(server) | |
769 iq_elt = client.IQ('get') | |
770 iq_elt['to'] = proxy.full() | |
771 iq_elt.addElement('query', NS_BS) | |
772 | |
773 try: | |
774 result_elt = yield iq_elt.send() | |
775 except jabber_error.StanzaError as failure: | |
776 log.warning(u"Error while requesting proxy info on {jid}: {error}" | |
777 .format(proxy.full(), failure)) | |
778 notFound(server) | |
779 | |
780 try: | |
781 query_elt = result_elt.elements(NS_BS, 'query').next() | |
782 streamhost_elt = query_elt.elements(NS_BS, 'streamhost').next() | |
783 host = streamhost_elt['host'] | |
784 jid_ = streamhost_elt['jid'] | |
785 port = streamhost_elt['port'] | |
786 if not all((host, jid, port)): | |
787 raise KeyError | |
788 jid_ = jid.JID(jid_) | |
789 except (StopIteration, KeyError, RuntimeError, jid.InvalidFormat): | |
790 log.warning(u"Invalid proxy data received from {}".format(proxy.full())) | |
791 notFound(server) | |
792 | |
793 proxy_infos = self._cache_proxies[server] = ProxyInfos(host, jid_, port) | |
794 log.info(u"Proxy found: {}".format(proxy_infos)) | |
795 defer.returnValue(proxy_infos) | |
796 | |
797 @defer.inlineCallbacks | |
798 def _getNetworkData(self, client): | |
799 """Retrieve information about network | |
800 | |
801 @param client: %(doc_client)s | |
802 @return (D(tuple[local_port, external_port, local_ips, external_ip])): network data | |
803 """ | |
804 self.getSocks5ServerFactory() | |
805 local_port = self._server_factory_port | |
806 external_ip = yield self._ip.getExternalIP(client.profile) | |
807 local_ips = yield self._ip.getLocalIPs(client.profile) | |
808 | |
809 if not local_ips: | |
810 log.warning(u"Can't find local IPs, we can't do direct connection") | |
811 else: | |
812 if external_ip is not None and self._external_port is None: | |
813 if external_ip != local_ips[0]: | |
814 log.info(u"We are probably behind a NAT") | |
815 if self._np is None: | |
816 log.warning(u"NAT port plugin not available, we can't map port") | |
817 else: | |
818 ext_port = yield self._np.mapPort(local_port, desc=u"SaT socks5 stream") | |
819 if ext_port is None: | |
820 log.warning(u"Can't map NAT port") | |
821 else: | |
822 self._external_port = ext_port | |
823 | |
824 defer.returnValue((local_port, self._external_port, local_ips, external_ip)) | |
825 | |
826 @defer.inlineCallbacks | |
827 def getCandidates(self, profile): | |
828 """Return a list of our stream candidates | |
829 | |
830 @param profile: %(doc_profile)s | |
831 @return (D(list[Candidate])): list of candidates, ordered by priority | |
832 """ | |
833 client = self.host.getClient(profile) | |
834 server_factory = yield self.getSocks5ServerFactory() | |
835 local_port, ext_port, local_ips, external_ip = yield self._getNetworkData(client) | |
836 proxy = yield self.getProxy(profile) | |
837 | |
838 # its time to gather the candidates | |
839 candidates = [] | |
840 | |
841 # first the direct ones | |
842 if local_ips: | |
843 # the preferred direct connection | |
844 ip = local_ips.pop(0) | |
845 candidates.append(Candidate(ip, local_port, XEP_0065.TYPE_DIRECT, PRIORITY_BEST_DIRECT, client.jid, priority_local=True, factory=server_factory)) | |
846 for ip in local_ips: | |
847 candidates.append(Candidate(ip, local_port, XEP_0065.TYPE_DIRECT, PRIORITY_DIRECT, client.jid, priority_local=True, factory=server_factory)) | |
848 | |
849 # then the assisted one | |
850 if ext_port is not None: | |
851 candidates.append(Candidate(external_ip, ext_port, XEP_0065.TYPE_ASSISTED, PRIORITY_ASSISTED, client.jid, priority_local=True, factory=server_factory)) | |
852 | |
853 # finally the proxy | |
854 if proxy: | |
855 candidates.append(Candidate(proxy.host, proxy.port, XEP_0065.TYPE_PROXY, PRIORITY_PROXY, proxy.jid, priority_local=True)) | |
856 | |
857 # should be already sorted, but just in case the priorities get weird | |
858 candidates.sort(key=lambda c: c.priority, reverse=True) | |
859 | |
860 defer.returnValue(candidates) | |
861 | |
862 def _addConnector(self, connector, candidate): | |
863 """Add connector used to connect to candidate, and return client factory's connection Deferred | |
864 | |
865 the connector can be used to disconnect the candidate, and returning the factory's connection Deferred allow to wait for connection completion | |
866 @param connector: a connector implementing IConnector | |
867 @param candidate(Candidate): candidate linked to the connector | |
868 @return (D): Deferred fired when factory connection is done or has failed | |
869 """ | |
870 candidate.factory.connector = connector | |
871 return candidate.factory.connection | |
872 | |
873 def tryCandidates(self, candidates, session_hash, connection_cb=None, connection_eb=None, profile=C.PROF_KEY_NONE): | |
874 defers_list = [] | |
875 | |
876 for candidate in candidates: | |
877 factory = Socks5ClientFactory(self, session_hash, profile) | |
878 candidate.factory = factory | |
879 delay = CANDIDATE_DELAY * len(defers_list) | |
880 if candidate.type == XEP_0065.TYPE_PROXY: | |
881 delay += CANDIDATE_DELAY_PROXY | |
882 d = sat_defer.DelayedDeferred(delay, candidate.host) | |
883 d.addCallback(reactor.connectTCP, candidate.port, factory) | |
884 d.addCallback(self._addConnector, candidate) | |
885 if connection_cb is not None: | |
886 d.addCallback(lambda dummy, candidate=candidate, profile=profile: connection_cb(candidate, profile)) | |
887 if connection_eb is not None: | |
888 d.addErrback(connection_eb, candidate, profile) | |
889 defers_list.append(d) | |
890 | |
891 return defers_list | |
892 | |
893 def getBestCandidate(self, candidates, session_hash, profile=C.PROF_KEY_NONE): | |
894 defer_candidates = None | |
895 | |
896 def connectionCb(candidate, profile): | |
897 log.info(u"Connection of {} successful".format(unicode(candidate))) | |
898 for idx, other_candidate in enumerate(candidates): | |
899 try: | |
900 if other_candidate.priority < candidate.priority: | |
901 log.debug(u"Cancelling {}".format(other_candidate)) | |
902 defer_candidates[idx].cancel() | |
903 except AttributeError: | |
904 assert other_candidate is None | |
905 | |
906 def connectionEb(failure, candidate, profile): | |
907 if failure.check(defer.CancelledError): | |
908 log.debug(u"Connection of {} has been cancelled".format(candidate)) | |
909 else: | |
910 log.info(u"Connection of {candidate} Failed: {error}".format( | |
911 candidate = candidate, | |
912 error = failure.value)) | |
913 candidates[candidates.index(candidate)] = None | |
914 | |
915 def allTested(self): | |
916 log.debug(u"All candidates have been tested") | |
917 good_candidates = [c for c in candidates if c] | |
918 return good_candidates[0] if good_candidates else None | |
919 | |
920 defer_candidates = self.tryCandidates(candidates, session_hash, connectionCb, connectionEb, profile) | |
921 d_list = defer.DeferredList(defer_candidates) | |
922 d_list.addCallback(allTested) | |
923 return d_list | |
924 | |
925 def _timeOut(self, sid, client): | |
501 """Delecte current_stream id, called after timeout | 926 """Delecte current_stream id, called after timeout |
502 @param id: id of client.xep_0065_current_stream""" | 927 @param id: id of client.xep_0065_current_stream""" |
503 log.info(_("Socks5 Bytestream: TimeOut reached for id %(sid)s [%(profile)s]") | 928 log.info(_("Socks5 Bytestream: TimeOut reached for id {sid} [{profile}]").format( |
504 % {"sid": sid, "profile": profile}) | 929 sid=sid, profile=client.profile)) |
505 self._killId(sid, False, "TIMEOUT", profile) | 930 self._killSession(sid, client, u"TIMEOUT") |
506 | 931 |
507 def _killId(self, sid, success=False, failure_reason="UNKNOWN", profile=None): | 932 def _killSession(self, sid, client, failure_reason=None): |
508 """Delete an current_stream id, clean up associated observers | 933 """Delete a current_stream id, clean up associated observers |
509 @param sid: id of client.xep_0065_current_stream""" | 934 |
510 assert(profile) | 935 @param sid(unicode): session id |
511 client = self.host.getClient(profile) | 936 @param client: %(doc_client)s |
512 if sid not in client.xep_0065_current_stream: | 937 @param failure_reason(None, unicode): if None the session is successful |
938 else, will be used to call failure_cb | |
939 """ | |
940 try: | |
941 session = client.xep_0065_current_stream[sid] | |
942 except KeyError: | |
513 log.warning(_("kill id called on a non existant id")) | 943 log.warning(_("kill id called on a non existant id")) |
514 return | 944 return |
515 if "observer_cb" in client.xep_0065_current_stream[sid]: | 945 |
516 xmlstream = client.xep_0065_current_stream[sid]["xmlstream"] | 946 try: |
517 xmlstream.removeObserver(client.xep_0065_current_stream[sid]["event_data"], client.xep_0065_current_stream[sid]["observer_cb"]) | 947 observer_cb = session['observer_cb'] |
518 if client.xep_0065_current_stream[sid]['timer'].active(): | 948 except KeyError: |
519 client.xep_0065_current_stream[sid]['timer'].cancel() | 949 pass |
520 if "size" in client.xep_0065_current_stream[sid]: | 950 else: |
521 self.host.removeProgressCB(sid, profile) | 951 client.xmlstream.removeObserver(session["event_data"], observer_cb) |
522 | 952 |
523 file_obj = client.xep_0065_current_stream[sid]['file_obj'] | 953 if session['timer'].active(): |
524 success_cb = client.xep_0065_current_stream[sid]['success_cb'] | 954 session['timer'].cancel() |
525 failure_cb = client.xep_0065_current_stream[sid]['failure_cb'] | 955 |
526 | |
527 session_hash = client.xep_0065_current_stream[sid].get('hash') | |
528 del client.xep_0065_current_stream[sid] | 956 del client.xep_0065_current_stream[sid] |
529 if session_hash in self.hash_sid_map: | 957 |
530 #FIXME: check that self.hash_sid_map is correctly cleaned in all cases (timeout, normal flow, etc). | 958 # FIXME: to check |
531 del self.hash_sid_map[session_hash] | 959 try: |
960 session_hash = session.get['hash'] | |
961 del self.hash_profiles_map[session_hash] | |
962 # FIXME: check that self.hash_profiles_map is correctly cleaned in all cases (timeout, normal flow, etc). | |
963 except KeyError: | |
964 log.debug(u"Not hash found for this session") | |
965 pass | |
966 | |
967 success = failure_reason is None | |
968 stream_d = session[DEFER_KEY] | |
532 | 969 |
533 if success: | 970 if success: |
534 success_cb(sid, file_obj, NS_BS, profile) | 971 stream_d.callback(None) |
535 else: | 972 else: |
536 failure_cb(sid, file_obj, NS_BS, failure_reason, profile) | 973 stream_d.errback(failure.Failure(exceptions.DataError(failure_reason))) |
537 | 974 |
538 def startStream(self, file_obj, to_jid, sid, length, successCb, failureCb, size=None, profile=None): | 975 def startStream(self, file_obj, to_jid, sid, profile=C.PROF_KEY_NONE): |
539 """Launch the stream workflow | 976 """Launch the stream workflow |
977 | |
540 @param file_obj: file_obj to send | 978 @param file_obj: file_obj to send |
541 @param to_jid: JID of the recipient | 979 @param to_jid: JID of the recipient |
542 @param sid: Stream session id | 980 @param sid: Stream session id |
543 @param length: number of byte to send, or None to send until the end | |
544 @param successCb: method to call when stream successfuly finished | 981 @param successCb: method to call when stream successfuly finished |
545 @param failureCb: method to call when something goes wrong | 982 @param failureCb: method to call when something goes wrong |
546 @param profile: %(doc_profile)s""" | 983 @param profile: %(doc_profile)s |
547 assert(profile) | 984 """ |
548 client = self.host.getClient(profile) | 985 client = self.host.getClient(profile) |
549 | 986 session_data = self._createSession(file_obj, to_jid, sid, client.profile) |
550 if length is not None: | 987 |
551 log.error(_('stream length not managed yet')) | 988 session_data["to"] = to_jid |
552 return | 989 session_data["xmlstream"] = client.xmlstream |
553 | 990 hash_ = session_data["hash"] = getSessionHash(client.jid, to_jid, sid) |
554 profile_jid = client.jid | 991 |
555 xmlstream = client.xmlstream | 992 self.hash_profiles_map[hash_] = (sid, profile) |
556 | 993 |
557 data = client.xep_0065_current_stream[sid] = {} | 994 iq_elt = jabber_client.IQ(client.xmlstream, 'set') |
558 data["timer"] = reactor.callLater(TIMEOUT, self._timeOut, sid, profile) | 995 iq_elt["from"] = client.jid.full() |
559 data["file_obj"] = file_obj | |
560 data["from"] = profile_jid | |
561 data["to"] = to_jid | |
562 data["success_cb"] = successCb | |
563 data["failure_cb"] = failureCb | |
564 data["xmlstream"] = xmlstream | |
565 data["hash"] = calculateHash(profile_jid, to_jid, sid) | |
566 self.hash_sid_map[data["hash"]] = (sid, profile) | |
567 if size: | |
568 data["size"] = size | |
569 self.host.registerProgressCB(sid, self.getProgress, profile) | |
570 iq_elt = jabber_client.IQ(xmlstream, 'set') | |
571 iq_elt["from"] = profile_jid.full() | |
572 iq_elt["to"] = to_jid.full() | 996 iq_elt["to"] = to_jid.full() |
573 query_elt = iq_elt.addElement('query', NS_BS) | 997 query_elt = iq_elt.addElement('query', NS_BS) |
574 query_elt['mode'] = 'tcp' | 998 query_elt['mode'] = 'tcp' |
575 query_elt['sid'] = sid | 999 query_elt['sid'] = sid |
1000 | |
576 #first streamhost: direct connection | 1001 #first streamhost: direct connection |
577 streamhost = query_elt.addElement('streamhost') | 1002 streamhost = query_elt.addElement('streamhost') |
578 streamhost['host'] = self.host.memory.getParamA("IP", "File Transfer") | 1003 streamhost['host'] = self.host.memory.getParamA("IP", "File Transfer") |
579 streamhost['port'] = self.host.memory.getParamA("Port", "File Transfer") | 1004 streamhost['port'] = self.host.memory.getParamA("Port", "File Transfer") |
580 streamhost['jid'] = profile_jid.full() | 1005 streamhost['jid'] = client.jid.full() |
581 | 1006 |
582 #second streamhost: mediated connection, using proxy | 1007 #second streamhost: mediated connection, using proxy |
583 streamhost = query_elt.addElement('streamhost') | 1008 streamhost = query_elt.addElement('streamhost') |
584 streamhost['host'] = self.host.memory.getParamA("Proxy host", "File Transfer", profile_key=profile) | 1009 streamhost['host'] = self.host.memory.getParamA("Proxy host", "File Transfer", profile_key=profile) |
585 streamhost['port'] = self.host.memory.getParamA("Proxy port", "File Transfer", profile_key=profile) | 1010 streamhost['port'] = self.host.memory.getParamA("Proxy port", "File Transfer", profile_key=profile) |
586 streamhost['jid'] = self.host.memory.getParamA("Proxy", "File Transfer", profile_key=profile) | 1011 streamhost['jid'] = self.host.memory.getParamA("Proxy", "File Transfer", profile_key=profile) |
587 | 1012 |
588 iq_elt.addCallback(self.iqResult, sid, profile) | 1013 iq_elt.addCallback(self._IQOpen, session_data, client) |
589 iq_elt.send() | 1014 iq_elt.send() |
590 | 1015 return session_data[DEFER_KEY] |
591 def iqResult(self, sid, profile, iq_elt): | 1016 |
592 """Called when the result of open iq is received""" | 1017 def _IQOpen(self, session_data, client, iq_elt): |
1018 """Called when the result of open iq is received | |
1019 | |
1020 @param session_data(dict): data of the session | |
1021 @param client: %(doc_client)s | |
1022 @param iq_elt(domish.Element): <iq> result | |
1023 """ | |
1024 sid = session_data['id'] | |
593 if iq_elt["type"] == "error": | 1025 if iq_elt["type"] == "error": |
594 log.warning(_("Transfer failed")) | 1026 log.warning(_("Socks5 transfer failed")) |
1027 # FIXME: must clean session | |
595 return | 1028 return |
596 client = self.host.getClient(profile) | 1029 |
597 try: | 1030 try: |
598 data = client.xep_0065_current_stream[sid] | 1031 session_data = client.xep_0065_current_stream[sid] |
599 file_obj = data["file_obj"] | 1032 file_obj = session_data["file_obj"] |
600 timer = data["timer"] | 1033 timer = session_data["timer"] |
601 except KeyError: | 1034 except KeyError: |
602 log.error(_("Internal error, can't do transfer")) | 1035 raise exceptions.InternalError |
603 return | 1036 |
604 | 1037 timer.reset(TIMEOUT) |
605 if timer.active(): | 1038 |
606 timer.cancel() | 1039 query_elt = iq_elt.elements(NS_BS, 'query').next() |
607 | 1040 streamhost_elts = list(query_elt.elements(NS_BS, 'streamhost-used')) |
608 profile_jid, xmlstream = self.host.getJidNStream(profile) | 1041 |
609 query_elt = iq_elt.firstChildElement() | |
610 streamhost_elts = filter(lambda elt: elt.name == 'streamhost-used', query_elt.elements()) | |
611 if not streamhost_elts: | 1042 if not streamhost_elts: |
612 log.warning(_("No streamhost found in stream query")) | 1043 log.warning(_("No streamhost found in stream query")) |
1044 # FIXME: must clean session | |
613 return | 1045 return |
614 | 1046 |
1047 # FIXME: must be cleaned ! | |
1048 | |
615 streamhost_jid = streamhost_elts[0]['jid'] | 1049 streamhost_jid = streamhost_elts[0]['jid'] |
616 if streamhost_jid != profile_jid.full(): | 1050 if streamhost_jid != client.jid.full(): |
617 log.debug(_("A proxy server is used")) | 1051 log.debug(_("A proxy server is used")) |
618 proxy_host = self.host.memory.getParamA("Proxy host", "File Transfer", profile_key=profile) | 1052 proxy_host = self.host.memory.getParamA("Proxy host", "File Transfer", profile_key=client.profile) |
619 proxy_port = self.host.memory.getParamA("Proxy port", "File Transfer", profile_key=profile) | 1053 proxy_port = self.host.memory.getParamA("Proxy port", "File Transfer", profile_key=client.profile) |
620 proxy_jid = self.host.memory.getParamA("Proxy", "File Transfer", profile_key=profile) | 1054 proxy_jid = self.host.memory.getParamA("Proxy", "File Transfer", profile_key=client.profile) |
621 if proxy_jid != streamhost_jid: | 1055 if proxy_jid != streamhost_jid: |
622 log.warning(_("Proxy jid is not the same as in parameters, this should not happen")) | 1056 log.warning(_("Proxy jid is not the same as in parameters, this should not happen")) |
623 return | 1057 return |
624 factory = Socks5ClientFactory(client.xep_0065_current_stream, sid, None, self.activateProxyStream, lambda sid, success, profile: self._killId(sid, success, profile=profile), True, profile) | 1058 factory = Socks5ClientFactory(client.xep_0065_current_stream, sid, None, self.activateProxyStream, lambda sid, success, profile: self._killSession(sid, client), True, client.profile) |
625 reactor.connectTCP(proxy_host, int(proxy_port), factory) | 1059 reactor.connectTCP(proxy_host, int(proxy_port), factory) |
626 else: | 1060 else: |
627 data["start_transfer_cb"](file_obj) # We now activate the stream | 1061 session_data["start_transfer_cb"](file_obj) # We now activate the stream |
628 | 1062 |
629 def activateProxyStream(self, sid, iq_id, start_transfer_cb, profile): | 1063 def activateProxyStream(self, sid, iq_id, start_transfer_cb, profile): |
630 log.debug(_("activating stream")) | 1064 log.debug(_("activating stream")) |
631 client = self.host.getClient(profile) | 1065 client = self.host.getClient(profile) |
632 data = client.xep_0065_current_stream[sid] | 1066 session_data = client.xep_0065_current_stream[sid] |
633 profile_jid, xmlstream = self.host.getJidNStream(profile) | 1067 |
634 | 1068 iq_elt = client.IQ(client.xmlstream, 'set') |
635 iq_elt = client.IQ(xmlstream, 'set') | 1069 iq_elt["from"] = client.jid.full() |
636 iq_elt["from"] = profile_jid.full() | |
637 iq_elt["to"] = self.host.memory.getParamA("Proxy", "File Transfer", profile_key=profile) | 1070 iq_elt["to"] = self.host.memory.getParamA("Proxy", "File Transfer", profile_key=profile) |
638 query_elt = iq_elt.addElement('query', NS_BS) | 1071 query_elt = iq_elt.addElement('query', NS_BS) |
639 query_elt['sid'] = sid | 1072 query_elt['sid'] = sid |
640 query_elt.addElement('activate', content=data['to'].full()) | 1073 query_elt.addElement('activate', content=session_data['to'].full()) |
641 iq_elt.addCallback(self.proxyResult, sid, start_transfer_cb, data['file_obj']) | 1074 iq_elt.addCallback(self.proxyResult, sid, start_transfer_cb, session_data['file_obj']) |
642 iq_elt.send() | 1075 iq_elt.send() |
643 | 1076 |
644 def proxyResult(self, sid, start_transfer_cb, file_obj, iq_elt): | 1077 def proxyResult(self, sid, start_transfer_cb, file_obj, iq_elt): |
645 if iq_elt['type'] == 'error': | 1078 if iq_elt['type'] == 'error': |
646 log.warning(_("Can't activate the proxy stream")) | 1079 log.warning(_("Can't activate the proxy stream")) |
647 return | 1080 return |
648 else: | 1081 else: |
649 start_transfer_cb(file_obj) | 1082 start_transfer_cb(file_obj) |
650 | 1083 |
651 def prepareToReceive(self, from_jid, sid, file_obj, size, success_cb, failure_cb, profile): | 1084 def createSession(self, *args, **kwargs): |
1085 """like [_createSession] but return the session deferred instead of the whole session | |
1086 | |
1087 session deferred is fired when transfer is finished | |
1088 """ | |
1089 return self._createSession(*args, **kwargs)[DEFER_KEY] | |
1090 | |
1091 def _createSession(self, file_obj, to_jid, sid, profile): | |
652 """Called when a bytestream is imminent | 1092 """Called when a bytestream is imminent |
653 @param from_jid: jid of the sender | 1093 |
654 @param sid: Stream id | 1094 @param file_obj(file): File object where data will be written |
655 @param file_obj: File object where data will be written | 1095 @param to_jid(jid.JId): jid of the other peer |
656 @param size: full size of the data, or None if unknown | 1096 @param sid(unicode): session id |
657 @param success_cb: method to call when successfuly finished | 1097 @param profile: %(doc_profile)s |
658 @param failure_cb: method to call when something goes wrong | 1098 @return (dict): session data |
659 @param profile: %(doc_profile)s""" | 1099 """ |
660 client = self.host.getClient(profile) | 1100 client = self.host.getClient(profile) |
661 data = client.xep_0065_current_stream[sid] = {} | 1101 if sid in client.xep_0065_current_stream: |
662 data["from"] = from_jid | 1102 raise exceptions.ConflictError(u'A session with this id already exists !') |
663 data["file_obj"] = file_obj | 1103 session_data = client.xep_0065_current_stream[sid] = \ |
664 data["seq"] = -1 | 1104 {'id': sid, |
665 if size: | 1105 DEFER_KEY: defer.Deferred(), |
666 data["size"] = size | 1106 'to': to_jid, |
667 self.host.registerProgressCB(sid, self.getProgress, profile) | 1107 'file_obj': file_obj, |
668 data["timer"] = reactor.callLater(TIMEOUT, self._timeOut, sid, profile) | 1108 'seq': -1, # FIXME: to check |
669 data["success_cb"] = success_cb | 1109 'timer': reactor.callLater(TIMEOUT, self._timeOut, sid, client), |
670 data["failure_cb"] = failure_cb | 1110 } |
1111 | |
1112 return session_data | |
1113 | |
1114 def getSession(self, session_hash, profile): | |
1115 """Return session data | |
1116 | |
1117 @param session_hash(unicode): hash of the session | |
1118 hash is the same as hostname computer in XEP-0065 § 5.3.2 #1 | |
1119 @param profile(None, unicode): profile of the peer | |
1120 None is used only if profile is unknown (this is only the case | |
1121 for incoming request received by Socks5ServerFactory). None must | |
1122 only be used by Socks5ServerFactory. | |
1123 See comments below for details | |
1124 @return (dict): session data | |
1125 """ | |
1126 if profile is None: | |
1127 try: | |
1128 profile = self.hash_profiles_map[session_hash] | |
1129 except KeyError as e: | |
1130 log.warning(u"The requested session doesn't exists !") | |
1131 raise e | |
1132 client = self.host.getClient(profile) | |
1133 return client._s5b_sessions[session_hash] | |
1134 | |
1135 def registerHash(self, *args, **kwargs): | |
1136 """like [_registerHash] but resutrn the session deferred instead of the whole session | |
1137 session deferred is fired when transfer is finished | |
1138 """ | |
1139 return self._registerHash(*args, **kwargs)[DEFER_KEY] | |
1140 | |
1141 def _registerHash(self, session_hash, file_obj, profile): | |
1142 """Create a session_data associated to hash | |
1143 | |
1144 @param session_hash(str): hash of the session | |
1145 @param file_obj(file): file-like object | |
1146 @param profile: %(doc_profile)s | |
1147 return (dict): session data | |
1148 """ | |
1149 client = self.host.getClient(profile) | |
1150 assert session_hash not in client._s5b_sessions | |
1151 session_data = client._s5b_sessions[session_hash] = { | |
1152 "file": file_obj, | |
1153 DEFER_KEY: defer.Deferred(), | |
1154 } | |
1155 if session_hash in self.hash_profiles_map: | |
1156 # The only case when 2 profiles want to register the same hash | |
1157 # is when they are on the same instance | |
1158 log.info(u"Both Socks5 peers are on the same instance") | |
1159 # XXX:If both peers are on the same instance, they'll register the same | |
1160 # session_hash, so we'll have 2 profiles for the same hash. The first | |
1161 # one will be the responder (and so the second one the initiator). | |
1162 # As we'll keep the initiator choosed candidate (see XEP-0260 § 2.4 #4), | |
1163 # responder will handle the Socks5 server. Only the server will use | |
1164 # self.hash_profiles_map to get the profile, so we can ignore the second | |
1165 # one (the initiator profile). | |
1166 # There is no easy way to known if the incoming connection | |
1167 # to the Socks5Server is from initiator or responder, so this seams a | |
1168 # reasonable workaround. | |
1169 else: | |
1170 self.hash_profiles_map[session_hash] = profile | |
1171 | |
1172 return session_data | |
671 | 1173 |
672 def streamQuery(self, iq_elt, profile): | 1174 def streamQuery(self, iq_elt, profile): |
673 """Get file using byte stream""" | 1175 """Get file using byte stream""" |
674 log.debug(_("BS stream query")) | 1176 log.debug(_("BS stream query")) |
675 client = self.host.getClient(profile) | 1177 client = self.host.getClient(profile) |
708 return | 1210 return |
709 | 1211 |
710 client.xep_0065_current_stream[sid]["streamhost"] = (sh_host, sh_port, sh_jid) | 1212 client.xep_0065_current_stream[sid]["streamhost"] = (sh_host, sh_port, sh_jid) |
711 | 1213 |
712 log.info(_("Stream proposed: host=[%(host)s] port=[%(port)s]") % {'host': sh_host, 'port': sh_port}) | 1214 log.info(_("Stream proposed: host=[%(host)s] port=[%(port)s]") % {'host': sh_host, 'port': sh_port}) |
713 factory = Socks5ClientFactory(client.xep_0065_current_stream, sid, iq_elt["id"], self.activateStream, lambda sid, success, profile: self._killId(sid, success, profile=profile), profile=profile) | 1215 factory = Socks5ClientFactory(client.xep_0065_current_stream, sid, iq_elt["id"], self.activateStream, lambda sid, success, profile: self._killSession(sid, client), profile=profile) |
714 reactor.connectTCP(sh_host, int(sh_port), factory) | 1216 reactor.connectTCP(sh_host, int(sh_port), factory) |
715 | 1217 |
716 def activateStream(self, sid, iq_id, profile): | 1218 def activateStream(self, sid, iq_id, profile): |
717 client = self.host.getClient(profile) | 1219 client = self.host.getClient(profile) |
718 log.debug(_("activating stream")) | 1220 log.debug(_("activating stream")) |
719 result = domish.Element((None, 'iq')) | 1221 result = domish.Element((None, 'iq')) |
720 data = client.xep_0065_current_stream[sid] | 1222 session_data = client.xep_0065_current_stream[sid] |
721 result['type'] = 'result' | 1223 result['type'] = 'result' |
722 result['id'] = iq_id | 1224 result['id'] = iq_id |
723 result['from'] = data["to"].full() | 1225 result['from'] = session_data["to"].full() |
724 result['to'] = data["from"].full() | 1226 result['to'] = session_data["from"].full() |
725 query = result.addElement('query', NS_BS) | 1227 query = result.addElement('query', NS_BS) |
726 query['sid'] = sid | 1228 query['sid'] = sid |
727 streamhost = query.addElement('streamhost-used') | 1229 streamhost = query.addElement('streamhost-used') |
728 streamhost['jid'] = data["streamhost"][2] | 1230 streamhost['jid'] = session_data["streamhost"][2] |
729 data["xmlstream"].send(result) | 1231 session_data["xmlstream"].send(result) |
730 | 1232 |
731 def sendNotAcceptableError(self, iq_id, to_jid, xmlstream): | 1233 def sendNotAcceptableError(self, iq_id, to_jid, xmlstream): |
732 """Not acceptable error used when the stream is not expected or something is going wrong | 1234 """Not acceptable error used when the stream is not expected or something is going wrong |
733 @param iq_id: IQ id | 1235 @param iq_id: IQ id |
734 @param to_jid: addressee | 1236 @param to_jid: addressee |
762 | 1264 |
763 def __init__(self, plugin_parent): | 1265 def __init__(self, plugin_parent): |
764 self.plugin_parent = plugin_parent | 1266 self.plugin_parent = plugin_parent |
765 self.host = plugin_parent.host | 1267 self.host = plugin_parent.host |
766 | 1268 |
767 def _proxyDataResult(self, iq_elt): | |
768 """Called with the information about proxy according to XEP-0065 #4 | |
769 Params should be filled with these infos""" | |
770 if iq_elt["type"] == "error": | |
771 log.warning(_("Can't determine proxy information")) | |
772 return | |
773 query_elt = iq_elt.firstChildElement() | |
774 if query_elt.name != "query": | |
775 log.warning(_("Bad answer received from proxy")) | |
776 return | |
777 streamhost_elts = filter(lambda elt: elt.name == 'streamhost', query_elt.elements()) | |
778 if not streamhost_elts: | |
779 log.warning(_("No streamhost found in stream query")) | |
780 return | |
781 if len(streamhost_elts) != 1: | |
782 log.warning(_("Multiple streamhost elements in proxy not managed, keeping only the first one")) | |
783 streamhost_elt = streamhost_elts[0] | |
784 self.host.memory.setParam("Proxy", streamhost_elt.getAttribute("jid", ""), | |
785 "File Transfer", profile_key=self.parent.profile) | |
786 self.host.memory.setParam("Proxy host", streamhost_elt.getAttribute("host", ""), | |
787 "File Transfer", profile_key=self.parent.profile) | |
788 self.host.memory.setParam("Proxy port", streamhost_elt.getAttribute("port", ""), | |
789 "File Transfer", profile_key=self.parent.profile) | |
790 | |
791 def connectionInitialized(self): | 1269 def connectionInitialized(self): |
792 def connection_ok(dummy): | 1270 self.xmlstream.addObserver(BS_REQUEST, self.plugin_parent.streamQuery, profile=self.parent.profile) |
793 self.xmlstream.addObserver(BS_REQUEST, self.plugin_parent.streamQuery, profile=self.parent.profile) | |
794 proxy = self.host.memory.getParamA("Proxy", "File Transfer", profile_key=self.parent.profile) | |
795 if not proxy: | |
796 def proxiesFound(entities): | |
797 try: | |
798 proxy_ent = entities.pop() | |
799 except KeyError: | |
800 log.info(_("No proxy found on this server")) | |
801 return | |
802 iq_elt = jabber_client.IQ(self.parent.xmlstream, 'get') | |
803 iq_elt["to"] = proxy_ent.full() | |
804 iq_elt.addElement('query', NS_BS) | |
805 iq_elt.addCallback(self._proxyDataResult) | |
806 iq_elt.send() | |
807 d = self.host.findServiceEntities("proxy", "bytestreams", profile_key=self.parent.profile) | |
808 d.addCallback(proxiesFound) | |
809 self.parent.getConnectionDeferred().addCallback(connection_ok) | |
810 | |
811 | 1271 |
812 def getDiscoInfo(self, requestor, target, nodeIdentifier=''): | 1272 def getDiscoInfo(self, requestor, target, nodeIdentifier=''): |
813 return [disco.DiscoFeature(NS_BS)] | 1273 return [disco.DiscoFeature(NS_BS)] |
814 | 1274 |
815 def getDiscoItems(self, requestor, target, nodeIdentifier=''): | 1275 def getDiscoItems(self, requestor, target, nodeIdentifier=''): |