comparison src/plugins/plugin_xep_0065.py @ 1559:7cc29634b6ef

plugin XEP-0065, XEP-0096: preparation for plugin XEP-0260 implementation: /!\ SI File Transfert (plugin XEP-0096) is temporarily broken /!\ proxy handling is temporarily broken plugin XEP-0096: use of Deferred for plugin XEP-0065 in the same way as for plugin XEP-0047 plugin XEP-0065: - use of Deferred for sessions - plugin IP is a dependency - plugin NAT-PORT is used if available - everything is now automatic, params are disabled for now (may be re-used in the future to force port or proxy) - proxy infos are managed with a namedtuple - connexion candidates are managed with a dedicate class - priorities can be used for candidates, as needed for XEP-0260 - transfer can now be managed in both direction, with client or server - socks5 server is launcher on demand, once for all profiles - helper methods to try and find best candidate - connection test and file transfer are done in 2 times
author Goffi <goffi@goffi.org>
date Mon, 02 Nov 2015 22:02:41 +0100
parents 3265a2639182
children 44854fb5d3b2
comparison
equal deleted inserted replaced
1558:6a8dd91476f0 1559:7cc29634b6ef
21 # You should have received a copy of the GNU Affero General Public License 21 # You should have received a copy of the GNU Affero General Public License
22 # along with this program. If not, see <http://www.gnu.org/licenses/>. 22 # along with this program. If not, see <http://www.gnu.org/licenses/>.
23 23
24 # -- 24 # --
25 25
26 # This program is based on proxy65 (http://code.google.com/p/proxy65), 26 # This module is based on proxy65 (http://code.google.com/p/proxy65),
27 # originaly written by David Smith and modified by Fabio Forno. 27 # originaly written by David Smith and modified by Fabio Forno.
28 # It is sublicensed under AGPL v3 (or any later version) as allowed by the original 28 # It is sublicensed under AGPL v3 (or any later version) as allowed by the original
29 # license. 29 # license.
30 30
31 # -- 31 # --
55 # THE SOFTWARE. 55 # THE SOFTWARE.
56 56
57 from sat.core.i18n import _ 57 from sat.core.i18n import _
58 from sat.core.log import getLogger 58 from sat.core.log import getLogger
59 log = getLogger(__name__) 59 log = getLogger(__name__)
60 from twisted.internet import protocol, reactor 60 from sat.core.constants import Const as C
61 from twisted.internet import error 61 from sat.core import exceptions
62 from sat.tools import sat_defer
63 from twisted.internet import protocol
64 from twisted.internet import reactor
65 from twisted.internet import error as internet_error
62 from twisted.words.protocols.jabber import jid, client as jabber_client 66 from twisted.words.protocols.jabber import jid, client as jabber_client
67 from twisted.words.protocols.jabber import error as jabber_error
63 from twisted.protocols.basic import FileSender 68 from twisted.protocols.basic import FileSender
64 from twisted.words.xish import domish 69 from twisted.words.xish import domish
65 from twisted.web.client import getPage 70 from twisted.internet import defer
71 from twisted.python import failure
66 from sat.core.exceptions import ProfileNotInCacheError 72 from sat.core.exceptions import ProfileNotInCacheError
73 from collections import namedtuple
67 import struct 74 import struct
68 import hashlib 75 import hashlib
76 import uuid
69 77
70 from zope.interface import implements 78 from zope.interface import implements
71 79
72 try: 80 try:
73 from twisted.words.protocols.xmlstream import XMPPHandler 81 from twisted.words.protocols.xmlstream import XMPPHandler
78 86
79 IQ_SET = '/iq[@type="set"]' 87 IQ_SET = '/iq[@type="set"]'
80 NS_BS = 'http://jabber.org/protocol/bytestreams' 88 NS_BS = 'http://jabber.org/protocol/bytestreams'
81 BS_REQUEST = IQ_SET + '/query[@xmlns="' + NS_BS + '"]' 89 BS_REQUEST = IQ_SET + '/query[@xmlns="' + NS_BS + '"]'
82 TIMEOUT = 60 # timeout for workflow 90 TIMEOUT = 60 # timeout for workflow
91 DEFER_KEY = 'finished' # key of the deferred used to track session end
92 SERVER_STARTING_PORT = 0 # starting number for server port search (0 to ask automatic attribution)
93
94 # priorities are candidates local priorities, must be a int between 0 and 65535
95 PRIORITY_BEST_DIRECT = 10000
96 PRIORITY_DIRECT = 5000
97 PRIORITY_ASSISTED = 1000
98 PRIORITY_PROXY = 0.2 # proxy is the last option for s5b
99 CANDIDATE_DELAY = 0.2 # see XEP-0260 §4
100 CANDIDATE_DELAY_PROXY = 0.2 # additional time for proxy types (see XEP-0260 §4 note 3)
83 101
84 PLUGIN_INFO = { 102 PLUGIN_INFO = {
85 "name": "XEP 0065 Plugin", 103 "name": "XEP 0065 Plugin",
86 "import_name": "XEP-0065", 104 "import_name": "XEP-0065",
87 "type": "XEP", 105 "type": "XEP",
88 "protocols": ["XEP-0065"], 106 "protocols": ["XEP-0065"],
107 "dependencies": ["IP"],
108 "recommendations": ["NAT-PORT"],
89 "main": "XEP_0065", 109 "main": "XEP_0065",
90 "handler": "yes", 110 "handler": "yes",
91 "description": _("""Implementation of SOCKS5 Bytestreams""") 111 "description": _("""Implementation of SOCKS5 Bytestreams""")
92 } 112 }
93 113
94 STATE_INITIAL = 0 114 # XXX: by default eveything is automatic
95 STATE_AUTH = 1 115 # TODO: use these params to force use of specific proxy/port/IP
96 STATE_REQUEST = 2 116 # PARAMS = """
97 STATE_READY = 3 117 # <params>
98 STATE_AUTH_USERPASS = 4 118 # <general>
99 STATE_TARGET_INITIAL = 5 119 # <category name="File Transfer">
100 STATE_TARGET_AUTH = 6 120 # <param name="Force IP" type="string" />
101 STATE_TARGET_REQUEST = 7 121 # <param name="Force Port" type="int" constraint="1;65535" />
102 STATE_TARGET_READY = 8 122 # </category>
103 STATE_LAST = 9 123 # </general>
104 124 # <individual>
105 STATE_CONNECT_PENDING = STATE_LAST + 1 125 # <category name="File Transfer">
126 # <param name="Force Proxy" value="" type="string" />
127 # <param name="Force Proxy host" value="" type="string" />
128 # <param name="Force Proxy port" value="" type="int" constraint="1;65535" />
129 # </category>
130 # </individual>
131 # </params>
132 # """
133
134 (STATE_INITIAL,
135 STATE_AUTH,
136 STATE_REQUEST,
137 STATE_READY,
138 STATE_AUTH_USERPASS,
139 STATE_CLIENT_INITIAL,
140 STATE_CLIENT_AUTH,
141 STATE_CLIENT_REQUEST,
142 ) = xrange(8)
106 143
107 SOCKS5_VER = 0x05 144 SOCKS5_VER = 0x05
108 145
109 ADDR_IPV4 = 0x01 146 ADDR_IPV4 = 0x01
110 ADDR_DOMAINNAME = 0x03 147 ADDR_DOMAINNAME = 0x03
127 REPLY_TTL_EXPIRED = 0x06 164 REPLY_TTL_EXPIRED = 0x06
128 REPLY_CMD_NOT_SUPPORTED = 0x07 165 REPLY_CMD_NOT_SUPPORTED = 0x07
129 REPLY_ADDR_NOT_SUPPORTED = 0x08 166 REPLY_ADDR_NOT_SUPPORTED = 0x08
130 167
131 168
132 def calculateHash(from_jid, to_jid, sid): 169 ProxyInfos = namedtuple("ProxyInfos", ['host', 'jid', 'port'])
133 """Calculate SHA1 Hash according to XEP-0065 170
134 @param from_jid: jid of the requester 171
135 @param to_jid: jid of the target 172 class Candidate(object):
136 @param sid: session id 173
137 @return: hash (string)""" 174 def __init__(self, host, port, type_, priority, jid_, id_=None, priority_local=False, factory=None):
175 """
176 @param host(unicode): host IP or domain
177 @param port(int): port
178 @param type_(unicode): stream type (one of XEP_0065.TYPE_*)
179 @param priority(int): priority
180 @param jid_(jid.JID): jid
181 @param id_(None, id_): Candidate ID, or None to generate
182 @param priority_local(bool): if True, priority is used as local priority,
183 else priority is used as global one (and local priority is set to 0)
184 """
185 assert isinstance(jid_, jid.JID)
186 self.host, self.port, self.type, self.jid = (
187 host, int(port), type_, jid_)
188 self.id = id_ if id_ is not None else unicode(uuid.uuid4())
189 if priority_local:
190 self._local_priority = int(priority)
191 self._priority = self.calculatePriority()
192 else:
193 self._local_priority = 0
194 self._priority = int(priority)
195 self.factory = factory
196
197 def discard(self):
198 """Disconnect a candidate if it is connected
199
200 Used to disconnect tryed client when they are discarded
201 """
202 log.debug(u"Discarding {}".format(self))
203 try:
204 self.factory.discard()
205 except AttributeError:
206 pass # no discard for Socks5ServerFactory
207
208 @property
209 def local_priority(self):
210 return self._local_priority
211
212 @property
213 def priority(self):
214 return self._priority
215
216 def __str__(self):
217 # similar to __unicode__ but we don't show jid and we encode id
218 return "Candidate ({0.priority}): host={0.host} port={0.port} type={0.type}{id}".format(
219 self,
220 id=u" id={}".format(self.id if self.id is not None else u'').encode('utf-8', 'ignore'),
221 )
222
223 def __unicode__(self):
224 return u"Candidate ({0.priority}): host={0.host} port={0.port} jid={0.jid} type={0.type}{id}".format(
225 self,
226 id=u" id={}".format(self.id if self.id is not None else u''),
227 )
228
229 def __eq__(self, other):
230 # self.id is is not used in __eq__ as the same candidate can have
231 # different ids if proposed by initiator or responder
232 try:
233 return (self.host == other.host and
234 self.port == other.port and
235 self.jid == other.jid)
236 except (AttributeError, TypeError):
237 return False
238
239 def __ne__(self, other):
240 return not self.__eq__(other)
241
242 def calculatePriority(self):
243 """Calculate candidate priority according to XEP-0260 §2.2
244
245
246 @return (int): priority
247 """
248 if self.type == XEP_0065.TYPE_DIRECT:
249 multiplier = 126
250 elif self.type == XEP_0065.TYPE_ASSISTED:
251 multiplier = 120
252 elif self.type == XEP_0065.TYPE_TUNEL:
253 multiplier = 110
254 elif self.type == XEP_0065.TYPE_PROXY:
255 multiplier = 10
256 else:
257 raise exceptions.InternalError(u"Unknown {} type !".format(self.type))
258 return 2**16 * multiplier + self._local_priority
259
260 def startTransfer(self, session_hash=None):
261 self.factory.startTransfer(session_hash)
262
263
264 def getSessionHash(from_jid, to_jid, sid):
265 """Calculate SHA1 Hash according to XEP-0065 §5.3.2
266
267 @param from_jid(jid.JID): jid of the requester
268 @param to_jid(jid.JID): jid of the target
269 @param sid(unicode): session id
270 @return (str): hash
271 """
138 return hashlib.sha1((sid + from_jid.full() + to_jid.full()).encode('utf-8')).hexdigest() 272 return hashlib.sha1((sid + from_jid.full() + to_jid.full()).encode('utf-8')).hexdigest()
139 273
140 274
141 class SOCKSv5(protocol.Protocol, FileSender): 275 class SOCKSv5(protocol.Protocol, FileSender):
142 def __init__(self): 276
277 def __init__(self, session_hash=None):
278 """
279 @param session_hash(str): hash of the session
280 must only be used in client mode
281 """
143 log.debug(_("Protocol init")) 282 log.debug(_("Protocol init"))
144 self.state = STATE_INITIAL 283 self.connection = defer.Deferred() # called when connection/auth is done
284 if session_hash is not None:
285 self.server_mode = False
286 self._session_hash = session_hash
287 self.state = STATE_CLIENT_INITIAL
288 else:
289 self.server_mode = True
290 self.state = STATE_INITIAL
145 self.buf = "" 291 self.buf = ""
146 self.supportedAuthMechs = [AUTHMECH_ANON] 292 self.supportedAuthMechs = [AUTHMECH_ANON]
147 self.supportedAddrs = [ADDR_DOMAINNAME] 293 self.supportedAddrs = [ADDR_DOMAINNAME]
148 self.enabledCommands = [CMD_CONNECT] 294 self.enabledCommands = [CMD_CONNECT]
149 self.peersock = None 295 self.peersock = None
150 self.addressType = 0 296 self.addressType = 0
151 self.requestType = 0 297 self.requestType = 0
298 self._file_obj = None
299
300 @property
301 def file_obj(self):
302 if self._file_obj is None:
303 if self.server_mode:
304 self._file_obj = self.factory.getSession(self._session_hash)["file"]
305 else:
306 self._file_obj = self.factory.getSession()['file']
307 return self._file_obj
152 308
153 def _startNegotiation(self): 309 def _startNegotiation(self):
154 log.debug("_startNegotiation") 310 log.debug("starting negotiation (client mode)")
155 self.state = STATE_TARGET_AUTH 311 self.state = STATE_CLIENT_AUTH
156 self.transport.write(struct.pack('!3B', SOCKS5_VER, 1, AUTHMECH_ANON)) 312 self.transport.write(struct.pack('!3B', SOCKS5_VER, 1, AUTHMECH_ANON))
157 313
158 def _parseNegotiation(self): 314 def _parseNegotiation(self):
159 log.debug("_parseNegotiation") 315 log.debug("_parseNegotiation")
160 try: 316 try:
182 # Complete negotiation w/ this method 338 # Complete negotiation w/ this method
183 self.transport.write(struct.pack('!BB', SOCKS5_VER, m)) 339 self.transport.write(struct.pack('!BB', SOCKS5_VER, m))
184 return 340 return
185 341
186 # No supported mechs found, notify client and close the connection 342 # No supported mechs found, notify client and close the connection
343 log.warning(u"Unsupported authentication mechanism")
187 self.transport.write(struct.pack('!BB', SOCKS5_VER, AUTHMECH_INVALID)) 344 self.transport.write(struct.pack('!BB', SOCKS5_VER, AUTHMECH_INVALID))
188 self.transport.loseConnection() 345 self.transport.loseConnection()
189 except struct.error: 346 except struct.error:
190 pass 347 pass
191 348
256 else: 413 else:
257 # Any other command is not supported 414 # Any other command is not supported
258 self.sendErrorReply(REPLY_CMD_NOT_SUPPORTED) 415 self.sendErrorReply(REPLY_CMD_NOT_SUPPORTED)
259 416
260 except struct.error: 417 except struct.error:
418 # The buffer is probably not complete, we need to wait more
261 return None 419 return None
262 420
263 def _makeRequest(self): 421 def _makeRequest(self):
264 log.debug("_makeRequest") 422 log.debug("_makeRequest")
265 self.state = STATE_TARGET_REQUEST 423 # sha1 = getSessionHash(self.data["from"], self.data["to"], self.sid)
266 sha1 = calculateHash(self.data["from"], self.data["to"], self.sid) 424 hash_ = self._session_hash
267 request = struct.pack('!5B%dsH' % len(sha1), SOCKS5_VER, CMD_CONNECT, 0, ADDR_DOMAINNAME, len(sha1), sha1, 0) 425 request = struct.pack('!5B%dsH' % len(hash_), SOCKS5_VER, CMD_CONNECT, 0, ADDR_DOMAINNAME, len(hash_), hash_, 0)
268 self.transport.write(request) 426 self.transport.write(request)
427 self.state = STATE_CLIENT_REQUEST
269 428
270 def _parseRequestReply(self): 429 def _parseRequestReply(self):
271 log.debug("_parseRequestReply") 430 log.debug("_parseRequestReply")
272 try: 431 try:
273 ver, rep, rsvd, self.addressType = struct.unpack('!BBBB', self.buf[:4]) 432 ver, rep, rsvd, self.addressType = struct.unpack('!BBBB', self.buf[:4])
292 # Ensure reply is OK 451 # Ensure reply is OK
293 if rep != REPLY_SUCCESS: 452 if rep != REPLY_SUCCESS:
294 self.loseConnection() 453 self.loseConnection()
295 return 454 return
296 455
297 if self.factory.proxy: 456 # if self.factory.proxy:
298 self.state = STATE_READY 457 # self.state = STATE_READY
299 self.factory.activateCb(self.sid, self.factory.iq_id, self.startTransfer, self.profile) 458 # self.factory.activateCb(self.sid, self.factory.iq_id, self.startTransfer, self.profile)
300 else: 459 # else:
301 self.state = STATE_TARGET_READY 460 self.state = STATE_READY
302 self.factory.activateCb(self.sid, self.factory.iq_id, self.profile) 461 self.connection.callback(None)
462 # self.factory.activateCb(self.sid, self.factory.iq_id, self.profile)
303 463
304 except struct.error: 464 except struct.error:
465 # The buffer is probably not complete, we need to wait more
305 return None 466 return None
306 467
307 def connectionMade(self): 468 def connectionMade(self):
308 log.debug(u"connectionMade (mode = %s)" % "requester" if isinstance(self.factory, Socks5ServerFactory) else "target") 469 log.debug(u"Socks5 connectionMade (mode = {})".format("server" if self.state == STATE_INITIAL else "client"))
309 470 if self.state == STATE_CLIENT_INITIAL:
310 if isinstance(self.factory, Socks5ClientFactory):
311 self.sid = self.factory.sid
312 self.profile = self.factory.profile
313 self.data = self.factory.data
314 self.state = STATE_TARGET_INITIAL
315 self._startNegotiation() 471 self._startNegotiation()
316 472
317 def connectRequested(self, addr, port): 473 def connectRequested(self, addr, port):
318 log.debug("connectRequested") 474 log.debug("connectRequested")
319 475
320 # Check that this session is expected 476 # Check that this session is expected
321 if addr not in self.factory.hash_sid_map: 477 if not self.factory.addToSession(addr, self):
322 #no: we refuse it
323 self.sendErrorReply(REPLY_CONN_REFUSED) 478 self.sendErrorReply(REPLY_CONN_REFUSED)
479 log.warning(u"Unexpected connection request received from {host}"
480 .format(host=self.transport.getPeer().host))
324 return 481 return
325 self.sid, self.profile = self.factory.hash_sid_map[addr] 482 self._session_hash = addr
326 client = self.factory.host.getClient(self.profile) 483 # self.sid, self.profile = self.factory.hash_profiles_map[addr]
327 client.xep_0065_current_stream[self.sid]["start_transfer_cb"] = self.startTransfer 484 # client = self.factory.host.getClient(self.profile)
485 # client.xep_0065_current_stream[self.sid]["start_transfer_cb"] = self.startTransfer
328 self.connectCompleted(addr, 0) 486 self.connectCompleted(addr, 0)
329 self.transport.stopReading() 487
330 488 def startTransfer(self):
331 def startTransfer(self, file_obj):
332 """Callback called when the result iq is received""" 489 """Callback called when the result iq is received"""
333 d = self.beginFileTransfer(file_obj, self.transport) 490 log.debug(u"Starting file transfer")
491 d = self.beginFileTransfer(self.file_obj, self.transport)
334 d.addCallback(self.fileTransfered) 492 d.addCallback(self.fileTransfered)
335 493
336 def fileTransfered(self, d): 494 def fileTransfered(self, d):
337 log.info(_("File transfer completed, closing connection")) 495 log.info(_("File transfer completed, closing connection"))
338 self.transport.loseConnection() 496 self.transport.loseConnection()
339 self.factory.finishedCb(self.sid, True, self.profile)
340 497
341 def connectCompleted(self, remotehost, remoteport): 498 def connectCompleted(self, remotehost, remoteport):
342 log.debug("connectCompleted") 499 log.debug("connectCompleted")
343 if self.addressType == ADDR_IPV4: 500 if self.addressType == ADDR_IPV4:
344 result = struct.pack('!BBBBIH', SOCKS5_VER, REPLY_SUCCESS, 0, 1, remotehost, remoteport) 501 result = struct.pack('!BBBBIH', SOCKS5_VER, REPLY_SUCCESS, 0, 1, remotehost, remoteport)
355 # FIXME: implement authentication and remove the debug printing a password 512 # FIXME: implement authentication and remove the debug printing a password
356 log.debug(u"User/pass: %s/%s" % (user, passwd)) 513 log.debug(u"User/pass: %s/%s" % (user, passwd))
357 return True 514 return True
358 515
359 def dataReceived(self, buf): 516 def dataReceived(self, buf):
360 if self.state == STATE_TARGET_READY: 517 if self.state == STATE_READY:
361 self.data["file_obj"].write(buf) 518 # Everything is set, we just have to write the incoming data
519 self.file_obj.write(buf)
362 return 520 return
363 521
364 self.buf = self.buf + buf 522 self.buf = self.buf + buf
365 if self.state == STATE_INITIAL: 523 if self.state == STATE_INITIAL:
366 self._parseNegotiation() 524 self._parseNegotiation()
367 if self.state == STATE_AUTH_USERPASS: 525 if self.state == STATE_AUTH_USERPASS:
368 self._parseUserPass() 526 self._parseUserPass()
369 if self.state == STATE_REQUEST: 527 if self.state == STATE_REQUEST:
370 self._parseRequest() 528 self._parseRequest()
371 if self.state == STATE_TARGET_AUTH: 529 if self.state == STATE_CLIENT_REQUEST:
530 self._parseRequestReply()
531 if self.state == STATE_CLIENT_AUTH:
372 ver, method = struct.unpack('!BB', buf) 532 ver, method = struct.unpack('!BB', buf)
373 self.buf = self.buf[2:] 533 self.buf = self.buf[2:]
374 if ver != SOCKS5_VER or method != AUTHMECH_ANON: 534 if ver != SOCKS5_VER or method != AUTHMECH_ANON:
375 self.transport.loseConnection() 535 self.transport.loseConnection()
376 else: 536 else:
377 self._makeRequest() 537 self._makeRequest()
378 if self.state == STATE_TARGET_REQUEST:
379 self._parseRequestReply()
380
381 def clientConnectionLost(self, reason):
382 log.debug("clientConnectionLost")
383 self.transport.loseConnection()
384 538
385 def connectionLost(self, reason): 539 def connectionLost(self, reason):
386 log.debug("connectionLost") 540 log.debug(u"Socks5 connection lost: {}".format(reason.value))
387 if self.state != STATE_CONNECT_PENDING: 541 # self.transport.unregisterProducer()
388 self.transport.unregisterProducer() 542 # if self.peersock is not None:
389 if self.peersock is not None: 543 # self.peersock.peersock = None
390 self.peersock.peersock = None 544 # self.peersock.transport.unregisterProducer()
391 self.peersock.transport.unregisterProducer() 545 # self.peersock = None
392 self.peersock = None 546 if self.state != STATE_READY:
547 self.connection.errback(reason)
548 if self.server_mode :
549 self.factory.removeFromSession(self._session_hash, self, reason)
393 550
394 551
395 class Socks5ServerFactory(protocol.ServerFactory): 552 class Socks5ServerFactory(protocol.ServerFactory):
396 protocol = SOCKSv5 553 protocol = SOCKSv5
397 554
398 def __init__(self, host, hash_sid_map, finishedCb): 555 def __init__(self, parent):
399 self.host = host 556 """
400 self.hash_sid_map = hash_sid_map 557 @param parent(XEP_0065): XEP_0065 parent instance
401 self.finishedCb = finishedCb 558 """
402 559 self.parent = parent
403 def startedConnecting(self, connector): 560
404 log.debug(_("Socks 5 server connection started")) 561 def getSession(self, session_hash):
405 562 return self.parent.getSession(session_hash, None)
406 def clientConnectionLost(self, connector, reason): 563
407 log.debug(_(u"Socks 5 server connection lost (reason: %s)") % reason) 564 def startTransfer(self, session_hash):
565 session = self.getSession(session_hash)
566 try:
567 protocol = session['protocols'][0]
568 except (KeyError, IndexError):
569 log.error(u"Can't start file transfer, can't find protocol")
570 else:
571 protocol.startTransfer()
572
573 def addToSession(self, session_hash, protocol):
574 """Check is session_hash is valid, and associate protocol with it
575
576 the session will be associated to the corresponding candidate
577 @param session_hash(str): hash of the session
578 @param protocol(SOCKSv5): protocol instance
579 @param return(bool): True if hash was valid (i.e. expected), False else
580 """
581 try:
582 session_data = self.getSession(session_hash)
583 except KeyError:
584 return False
585 else:
586 session_data.setdefault('protocols', []).append(protocol)
587 return True
588
589 def removeFromSession(self, session_hash, protocol, reason):
590 """Remove a protocol from session_data
591
592 There can be several protocol instances while candidates are tried, they
593 have removed when candidate connection is closed
594 @param session_hash(str): hash of the session
595 @param protocol(SOCKSv5): protocol instance
596 @param reason(failure.Failure): reason of the removal
597 """
598 try:
599 protocols = self.getSession(session_hash)['protocols']
600 protocols.remove(protocol)
601 except (KeyError, ValueError):
602 log.error(u"Protocol not found in session while it should be there")
603 else:
604 if not protocols:
605 # The last protocol has been removed, session is finished
606 if reason.check(internet_error.ConnectionDone):
607 self.getSession(session_hash)[DEFER_KEY].callback(None)
608 else:
609 self.getSession(session_hash)[DEFER_KEY].errback(reason)
408 610
409 611
410 class Socks5ClientFactory(protocol.ClientFactory): 612 class Socks5ClientFactory(protocol.ClientFactory):
411 protocol = SOCKSv5 613 protocol = SOCKSv5
412 614
413 def __init__(self, current_stream, sid, iq_id, activateCb, finishedCb, proxy=False, profile=None): 615 # def __init__(self, stream_data, sid, iq_id, activateCb, finishedCb, proxy=False, profile=C.PROF_KEY_NONE):
616 def __init__(self, parent, session_hash, profile):
414 """Init the Client Factory 617 """Init the Client Factory
415 @param current_stream: current streams data 618
416 @param sid: Session ID 619 @param session_hash(unicode): hash of the session
417 @param iq_id: iq id used to initiate the stream 620 hash is the same as hostname computer in XEP-0065 § 5.3.2 #1
418 @param activateCb: method to call to activate the stream 621 @param profile(unciode): %(doc_profile)s
419 @param finishedCb: method to call when the stream session is finished 622 """
420 @param proxy: True if we are connecting throught a proxy (and we are a requester) 623 self.session = parent.getSession(session_hash, profile)
421 @param profile: %(doc_profile)s""" 624 self.session_hash = session_hash
422 assert(profile)
423 self.data = current_stream[sid]
424 self.sid = sid
425 self.iq_id = iq_id
426 self.activateCb = activateCb
427 self.finishedCb = finishedCb
428 self.proxy = proxy
429 self.profile = profile 625 self.profile = profile
430 626 self.connection = defer.Deferred()
431 def startedConnecting(self, connector): 627 self._protocol_instance = None
432 log.debug(_("Socks 5 client connection started")) 628 self.connector = None
629 self._discarded = False
630 # self.data = stream_data[sid]
631 # self.sid = sid
632 # self.iq_id = iq_id
633 # self.activateCb = activateCb
634 # self.finishedCb = finishedCb
635 # self.proxy = proxy
636 # self.profile = profile
637
638 def discard(self):
639 """Disconnect the client
640
641 Also set a discarded flag, which avoid to call the session Deferred
642 """
643 self.connector.disconnect()
644 self._discarded = True
645
646 def getSession(self):
647 return self.session
648
649 def startTransfer(self, dummy=None):
650 self._protocol_instance.startTransfer()
651
652 def clientConnectionFailed(self, connector, reason):
653 log.debug(u"Connection failed")
654 self.connection.errback(reason)
433 655
434 def clientConnectionLost(self, connector, reason): 656 def clientConnectionLost(self, connector, reason):
435 log.debug(_(u"Socks 5 client connection lost (reason: %s)") % reason) 657 log.debug(_(u"Socks 5 client connection lost (reason: %s)") % reason.value)
436 self.finishedCb(self.sid, reason.type == error.ConnectionDone, self.profile) # TODO: really check if the state is actually successful 658 self._protocol_instance = None
659 if not self._discarded:
660 # This one was used for the transfer, than mean that
661 # the Socks5 session is finished
662 if reason.check(internet_error.ConnectionDone):
663 self.getSession()[DEFER_KEY].callback(None)
664 else:
665 self.getSession()[DEFER_KEY].errback(reason)
666 # self.finishedCb(self.sid, reason.type == internet_error.ConnectionDone, self.profile) # TODO: really check if the state is actually successful
667
668 def buildProtocol(self, addr):
669 log.debug(("Socks 5 client connection started"))
670 p = self.protocol(session_hash=self.session_hash)
671 p.factory = self
672 p.connection.chainDeferred(self.connection)
673 self._protocol_instance = p
674 return p
437 675
438 676
439 class XEP_0065(object): 677 class XEP_0065(object):
440
441 NAMESPACE = NS_BS 678 NAMESPACE = NS_BS
442 679 TYPE_DIRECT = 'direct'
443 params = """ 680 TYPE_ASSISTED = 'assisted'
444 <params> 681 TYPE_TUNEL = 'tunel'
445 <general> 682 TYPE_PROXY = 'proxy'
446 <category name="File Transfer"> 683 Candidate = Candidate
447 <param name="IP" value='0.0.0.0' default_cb='yes' type="string" />
448 <param name="Port" value="28915" type="int" constraint="1;65535" />
449 </category>
450 </general>
451 <individual>
452 <category name="File Transfer">
453 <param name="Proxy" value="" type="string" />
454 <param name="Proxy host" value="" type="string" />
455 <param name="Proxy port" value="" type="int" constraint="1;65535" />
456 </category>
457 </individual>
458 </params>
459 """
460 684
461 def __init__(self, host): 685 def __init__(self, host):
462 log.info(_("Plugin XEP_0065 initialization")) 686 log.info(_("Plugin XEP_0065 initialization"))
463
464 #session data
465 self.hash_sid_map = {} # key: hash of the transfer session, value: (session id, profile)
466
467 self.host = host 687 self.host = host
468 log.debug(_("registering")) 688
469 self.server_factory = Socks5ServerFactory(host, self.hash_sid_map, lambda sid, success, profile: self._killId(sid, success, profile=profile)) 689 # session data
470 690 self.hash_profiles_map = {} # key: hash of the transfer session, value: session data
471 #parameters 691 self._cache_proxies = {} # key: server jid, value: proxy data
472 host.memory.updateParams(XEP_0065.params) 692
473 host.memory.setDefault("IP", "File Transfer", self.getExternalIP) 693 # misc data
474 port = int(self.host.memory.getParamA("Port", "File Transfer")) 694 self._server_factory = None
475 695 self._external_port = None
476 log.info(_("Launching Socks5 Stream server on port %d") % port) 696
477 reactor.listenTCP(port, self.server_factory) 697 # plugins shortcuts
698 self._ip = self.host.plugins['IP']
699 try:
700 self._np = self.host.plugins['NAT-PORT']
701 except KeyError:
702 log.debug(u"NAT Port plugin not available")
703 self._np = None
704
705 # parameters
706 # XXX: params are not used for now, but they may be used in the futur to force proxy/IP
707 # host.memory.updateParams(PARAMS)
478 708
479 def getHandler(self, profile): 709 def getHandler(self, profile):
480 return XEP_0065_handler(self) 710 return XEP_0065_handler(self)
481 711
482 def profileConnected(self, profile): 712 def profileConnected(self, profile):
483 client = self.host.getClient(profile) 713 client = self.host.getClient(profile)
484 client.xep_0065_current_stream = {} # key: stream_id, value: data(dict) 714 client.xep_0065_current_stream = {} # key: stream_id, value: session_data(dict)
485 715 client._s5b_sessions = {}
486 def getExternalIP(self): 716
487 """Return IP visible from outside, by asking to a website""" 717 def getSessionHash(self, from_jid, to_jid, sid):
488 return getPage("http://www.goffi.org/sat_tools/get_ip.php") 718 return getSessionHash(from_jid, to_jid, sid)
489 719
490 def getProgress(self, sid, data, profile): 720 def getSocks5ServerFactory(self):
491 """Fill data with position of current transfer""" 721 """Return server factory
722
723 The server is created if it doesn't exists yet
724 self._server_factory_port is set on server creation
725 """
726
727 if self._server_factory is None:
728 # self._server_factory = Socks5ServerFactory(self.host, self.hash_profiles_map, lambda sid, client: self._killSession(sid, client))
729 self._server_factory = Socks5ServerFactory(self)
730 for port in xrange(SERVER_STARTING_PORT, 65356):
731 try:
732 listening_port = reactor.listenTCP(port, self._server_factory)
733 except internet_error.CannotListenError as e:
734 log.debug(u"Cannot listen on port {port}: {err_msg}{err_num}".format(
735 port=port,
736 err_msg=e.socketError.strerror,
737 err_num=u' (error code: {})'.format(e.socketError.errno),
738 ))
739 else:
740 self._server_factory_port = listening_port.getHost().port
741 break
742
743 log.info(_("Socks5 Stream server launched on port {}").format(self._server_factory_port))
744 return self._server_factory
745
746 @defer.inlineCallbacks
747 def getProxy(self, profile):
748 """Return the proxy available for this profile
749
750 cache is used between profiles using the same server
751 @param profile: %(doc_profile)s
752 @return ((D)(ProxyInfos, None)): Found proxy infos,
753 or None if not acceptable proxy is found
754 """
755 def notFound(server):
756 log.info(u"No proxy found on this server")
757 self._cache_proxies[server] = None
758 defer.returnValue(None)
492 client = self.host.getClient(profile) 759 client = self.host.getClient(profile)
493 try: 760 server = client.jid.host
494 file_obj = client.xep_0065_current_stream[sid]["file_obj"] 761 try:
495 data["position"] = str(file_obj.tell()) 762 defer.returnValue(self._cache_proxies[server])
496 data["size"] = str(client.xep_0065_current_stream[sid]["size"]) 763 except KeyError:
497 except:
498 pass 764 pass
499 765 try:
500 def _timeOut(self, sid, profile): 766 proxy = (yield self.host.findServiceEntities('proxy', 'bytestreams', profile=profile)).pop()
767 except (exceptions.CancelError, StopIteration):
768 notFound(server)
769 iq_elt = client.IQ('get')
770 iq_elt['to'] = proxy.full()
771 iq_elt.addElement('query', NS_BS)
772
773 try:
774 result_elt = yield iq_elt.send()
775 except jabber_error.StanzaError as failure:
776 log.warning(u"Error while requesting proxy info on {jid}: {error}"
777 .format(proxy.full(), failure))
778 notFound(server)
779
780 try:
781 query_elt = result_elt.elements(NS_BS, 'query').next()
782 streamhost_elt = query_elt.elements(NS_BS, 'streamhost').next()
783 host = streamhost_elt['host']
784 jid_ = streamhost_elt['jid']
785 port = streamhost_elt['port']
786 if not all((host, jid, port)):
787 raise KeyError
788 jid_ = jid.JID(jid_)
789 except (StopIteration, KeyError, RuntimeError, jid.InvalidFormat):
790 log.warning(u"Invalid proxy data received from {}".format(proxy.full()))
791 notFound(server)
792
793 proxy_infos = self._cache_proxies[server] = ProxyInfos(host, jid_, port)
794 log.info(u"Proxy found: {}".format(proxy_infos))
795 defer.returnValue(proxy_infos)
796
797 @defer.inlineCallbacks
798 def _getNetworkData(self, client):
799 """Retrieve information about network
800
801 @param client: %(doc_client)s
802 @return (D(tuple[local_port, external_port, local_ips, external_ip])): network data
803 """
804 self.getSocks5ServerFactory()
805 local_port = self._server_factory_port
806 external_ip = yield self._ip.getExternalIP(client.profile)
807 local_ips = yield self._ip.getLocalIPs(client.profile)
808
809 if not local_ips:
810 log.warning(u"Can't find local IPs, we can't do direct connection")
811 else:
812 if external_ip is not None and self._external_port is None:
813 if external_ip != local_ips[0]:
814 log.info(u"We are probably behind a NAT")
815 if self._np is None:
816 log.warning(u"NAT port plugin not available, we can't map port")
817 else:
818 ext_port = yield self._np.mapPort(local_port, desc=u"SaT socks5 stream")
819 if ext_port is None:
820 log.warning(u"Can't map NAT port")
821 else:
822 self._external_port = ext_port
823
824 defer.returnValue((local_port, self._external_port, local_ips, external_ip))
825
826 @defer.inlineCallbacks
827 def getCandidates(self, profile):
828 """Return a list of our stream candidates
829
830 @param profile: %(doc_profile)s
831 @return (D(list[Candidate])): list of candidates, ordered by priority
832 """
833 client = self.host.getClient(profile)
834 server_factory = yield self.getSocks5ServerFactory()
835 local_port, ext_port, local_ips, external_ip = yield self._getNetworkData(client)
836 proxy = yield self.getProxy(profile)
837
838 # its time to gather the candidates
839 candidates = []
840
841 # first the direct ones
842 if local_ips:
843 # the preferred direct connection
844 ip = local_ips.pop(0)
845 candidates.append(Candidate(ip, local_port, XEP_0065.TYPE_DIRECT, PRIORITY_BEST_DIRECT, client.jid, priority_local=True, factory=server_factory))
846 for ip in local_ips:
847 candidates.append(Candidate(ip, local_port, XEP_0065.TYPE_DIRECT, PRIORITY_DIRECT, client.jid, priority_local=True, factory=server_factory))
848
849 # then the assisted one
850 if ext_port is not None:
851 candidates.append(Candidate(external_ip, ext_port, XEP_0065.TYPE_ASSISTED, PRIORITY_ASSISTED, client.jid, priority_local=True, factory=server_factory))
852
853 # finally the proxy
854 if proxy:
855 candidates.append(Candidate(proxy.host, proxy.port, XEP_0065.TYPE_PROXY, PRIORITY_PROXY, proxy.jid, priority_local=True))
856
857 # should be already sorted, but just in case the priorities get weird
858 candidates.sort(key=lambda c: c.priority, reverse=True)
859
860 defer.returnValue(candidates)
861
862 def _addConnector(self, connector, candidate):
863 """Add connector used to connect to candidate, and return client factory's connection Deferred
864
865 the connector can be used to disconnect the candidate, and returning the factory's connection Deferred allow to wait for connection completion
866 @param connector: a connector implementing IConnector
867 @param candidate(Candidate): candidate linked to the connector
868 @return (D): Deferred fired when factory connection is done or has failed
869 """
870 candidate.factory.connector = connector
871 return candidate.factory.connection
872
873 def tryCandidates(self, candidates, session_hash, connection_cb=None, connection_eb=None, profile=C.PROF_KEY_NONE):
874 defers_list = []
875
876 for candidate in candidates:
877 factory = Socks5ClientFactory(self, session_hash, profile)
878 candidate.factory = factory
879 delay = CANDIDATE_DELAY * len(defers_list)
880 if candidate.type == XEP_0065.TYPE_PROXY:
881 delay += CANDIDATE_DELAY_PROXY
882 d = sat_defer.DelayedDeferred(delay, candidate.host)
883 d.addCallback(reactor.connectTCP, candidate.port, factory)
884 d.addCallback(self._addConnector, candidate)
885 if connection_cb is not None:
886 d.addCallback(lambda dummy, candidate=candidate, profile=profile: connection_cb(candidate, profile))
887 if connection_eb is not None:
888 d.addErrback(connection_eb, candidate, profile)
889 defers_list.append(d)
890
891 return defers_list
892
893 def getBestCandidate(self, candidates, session_hash, profile=C.PROF_KEY_NONE):
894 defer_candidates = None
895
896 def connectionCb(candidate, profile):
897 log.info(u"Connection of {} successful".format(unicode(candidate)))
898 for idx, other_candidate in enumerate(candidates):
899 try:
900 if other_candidate.priority < candidate.priority:
901 log.debug(u"Cancelling {}".format(other_candidate))
902 defer_candidates[idx].cancel()
903 except AttributeError:
904 assert other_candidate is None
905
906 def connectionEb(failure, candidate, profile):
907 if failure.check(defer.CancelledError):
908 log.debug(u"Connection of {} has been cancelled".format(candidate))
909 else:
910 log.info(u"Connection of {candidate} Failed: {error}".format(
911 candidate = candidate,
912 error = failure.value))
913 candidates[candidates.index(candidate)] = None
914
915 def allTested(self):
916 log.debug(u"All candidates have been tested")
917 good_candidates = [c for c in candidates if c]
918 return good_candidates[0] if good_candidates else None
919
920 defer_candidates = self.tryCandidates(candidates, session_hash, connectionCb, connectionEb, profile)
921 d_list = defer.DeferredList(defer_candidates)
922 d_list.addCallback(allTested)
923 return d_list
924
925 def _timeOut(self, sid, client):
501 """Delecte current_stream id, called after timeout 926 """Delecte current_stream id, called after timeout
502 @param id: id of client.xep_0065_current_stream""" 927 @param id: id of client.xep_0065_current_stream"""
503 log.info(_("Socks5 Bytestream: TimeOut reached for id %(sid)s [%(profile)s]") 928 log.info(_("Socks5 Bytestream: TimeOut reached for id {sid} [{profile}]").format(
504 % {"sid": sid, "profile": profile}) 929 sid=sid, profile=client.profile))
505 self._killId(sid, False, "TIMEOUT", profile) 930 self._killSession(sid, client, u"TIMEOUT")
506 931
507 def _killId(self, sid, success=False, failure_reason="UNKNOWN", profile=None): 932 def _killSession(self, sid, client, failure_reason=None):
508 """Delete an current_stream id, clean up associated observers 933 """Delete a current_stream id, clean up associated observers
509 @param sid: id of client.xep_0065_current_stream""" 934
510 assert(profile) 935 @param sid(unicode): session id
511 client = self.host.getClient(profile) 936 @param client: %(doc_client)s
512 if sid not in client.xep_0065_current_stream: 937 @param failure_reason(None, unicode): if None the session is successful
938 else, will be used to call failure_cb
939 """
940 try:
941 session = client.xep_0065_current_stream[sid]
942 except KeyError:
513 log.warning(_("kill id called on a non existant id")) 943 log.warning(_("kill id called on a non existant id"))
514 return 944 return
515 if "observer_cb" in client.xep_0065_current_stream[sid]: 945
516 xmlstream = client.xep_0065_current_stream[sid]["xmlstream"] 946 try:
517 xmlstream.removeObserver(client.xep_0065_current_stream[sid]["event_data"], client.xep_0065_current_stream[sid]["observer_cb"]) 947 observer_cb = session['observer_cb']
518 if client.xep_0065_current_stream[sid]['timer'].active(): 948 except KeyError:
519 client.xep_0065_current_stream[sid]['timer'].cancel() 949 pass
520 if "size" in client.xep_0065_current_stream[sid]: 950 else:
521 self.host.removeProgressCB(sid, profile) 951 client.xmlstream.removeObserver(session["event_data"], observer_cb)
522 952
523 file_obj = client.xep_0065_current_stream[sid]['file_obj'] 953 if session['timer'].active():
524 success_cb = client.xep_0065_current_stream[sid]['success_cb'] 954 session['timer'].cancel()
525 failure_cb = client.xep_0065_current_stream[sid]['failure_cb'] 955
526
527 session_hash = client.xep_0065_current_stream[sid].get('hash')
528 del client.xep_0065_current_stream[sid] 956 del client.xep_0065_current_stream[sid]
529 if session_hash in self.hash_sid_map: 957
530 #FIXME: check that self.hash_sid_map is correctly cleaned in all cases (timeout, normal flow, etc). 958 # FIXME: to check
531 del self.hash_sid_map[session_hash] 959 try:
960 session_hash = session.get['hash']
961 del self.hash_profiles_map[session_hash]
962 # FIXME: check that self.hash_profiles_map is correctly cleaned in all cases (timeout, normal flow, etc).
963 except KeyError:
964 log.debug(u"Not hash found for this session")
965 pass
966
967 success = failure_reason is None
968 stream_d = session[DEFER_KEY]
532 969
533 if success: 970 if success:
534 success_cb(sid, file_obj, NS_BS, profile) 971 stream_d.callback(None)
535 else: 972 else:
536 failure_cb(sid, file_obj, NS_BS, failure_reason, profile) 973 stream_d.errback(failure.Failure(exceptions.DataError(failure_reason)))
537 974
538 def startStream(self, file_obj, to_jid, sid, length, successCb, failureCb, size=None, profile=None): 975 def startStream(self, file_obj, to_jid, sid, profile=C.PROF_KEY_NONE):
539 """Launch the stream workflow 976 """Launch the stream workflow
977
540 @param file_obj: file_obj to send 978 @param file_obj: file_obj to send
541 @param to_jid: JID of the recipient 979 @param to_jid: JID of the recipient
542 @param sid: Stream session id 980 @param sid: Stream session id
543 @param length: number of byte to send, or None to send until the end
544 @param successCb: method to call when stream successfuly finished 981 @param successCb: method to call when stream successfuly finished
545 @param failureCb: method to call when something goes wrong 982 @param failureCb: method to call when something goes wrong
546 @param profile: %(doc_profile)s""" 983 @param profile: %(doc_profile)s
547 assert(profile) 984 """
548 client = self.host.getClient(profile) 985 client = self.host.getClient(profile)
549 986 session_data = self._createSession(file_obj, to_jid, sid, client.profile)
550 if length is not None: 987
551 log.error(_('stream length not managed yet')) 988 session_data["to"] = to_jid
552 return 989 session_data["xmlstream"] = client.xmlstream
553 990 hash_ = session_data["hash"] = getSessionHash(client.jid, to_jid, sid)
554 profile_jid = client.jid 991
555 xmlstream = client.xmlstream 992 self.hash_profiles_map[hash_] = (sid, profile)
556 993
557 data = client.xep_0065_current_stream[sid] = {} 994 iq_elt = jabber_client.IQ(client.xmlstream, 'set')
558 data["timer"] = reactor.callLater(TIMEOUT, self._timeOut, sid, profile) 995 iq_elt["from"] = client.jid.full()
559 data["file_obj"] = file_obj
560 data["from"] = profile_jid
561 data["to"] = to_jid
562 data["success_cb"] = successCb
563 data["failure_cb"] = failureCb
564 data["xmlstream"] = xmlstream
565 data["hash"] = calculateHash(profile_jid, to_jid, sid)
566 self.hash_sid_map[data["hash"]] = (sid, profile)
567 if size:
568 data["size"] = size
569 self.host.registerProgressCB(sid, self.getProgress, profile)
570 iq_elt = jabber_client.IQ(xmlstream, 'set')
571 iq_elt["from"] = profile_jid.full()
572 iq_elt["to"] = to_jid.full() 996 iq_elt["to"] = to_jid.full()
573 query_elt = iq_elt.addElement('query', NS_BS) 997 query_elt = iq_elt.addElement('query', NS_BS)
574 query_elt['mode'] = 'tcp' 998 query_elt['mode'] = 'tcp'
575 query_elt['sid'] = sid 999 query_elt['sid'] = sid
1000
576 #first streamhost: direct connection 1001 #first streamhost: direct connection
577 streamhost = query_elt.addElement('streamhost') 1002 streamhost = query_elt.addElement('streamhost')
578 streamhost['host'] = self.host.memory.getParamA("IP", "File Transfer") 1003 streamhost['host'] = self.host.memory.getParamA("IP", "File Transfer")
579 streamhost['port'] = self.host.memory.getParamA("Port", "File Transfer") 1004 streamhost['port'] = self.host.memory.getParamA("Port", "File Transfer")
580 streamhost['jid'] = profile_jid.full() 1005 streamhost['jid'] = client.jid.full()
581 1006
582 #second streamhost: mediated connection, using proxy 1007 #second streamhost: mediated connection, using proxy
583 streamhost = query_elt.addElement('streamhost') 1008 streamhost = query_elt.addElement('streamhost')
584 streamhost['host'] = self.host.memory.getParamA("Proxy host", "File Transfer", profile_key=profile) 1009 streamhost['host'] = self.host.memory.getParamA("Proxy host", "File Transfer", profile_key=profile)
585 streamhost['port'] = self.host.memory.getParamA("Proxy port", "File Transfer", profile_key=profile) 1010 streamhost['port'] = self.host.memory.getParamA("Proxy port", "File Transfer", profile_key=profile)
586 streamhost['jid'] = self.host.memory.getParamA("Proxy", "File Transfer", profile_key=profile) 1011 streamhost['jid'] = self.host.memory.getParamA("Proxy", "File Transfer", profile_key=profile)
587 1012
588 iq_elt.addCallback(self.iqResult, sid, profile) 1013 iq_elt.addCallback(self._IQOpen, session_data, client)
589 iq_elt.send() 1014 iq_elt.send()
590 1015 return session_data[DEFER_KEY]
591 def iqResult(self, sid, profile, iq_elt): 1016
592 """Called when the result of open iq is received""" 1017 def _IQOpen(self, session_data, client, iq_elt):
1018 """Called when the result of open iq is received
1019
1020 @param session_data(dict): data of the session
1021 @param client: %(doc_client)s
1022 @param iq_elt(domish.Element): <iq> result
1023 """
1024 sid = session_data['id']
593 if iq_elt["type"] == "error": 1025 if iq_elt["type"] == "error":
594 log.warning(_("Transfer failed")) 1026 log.warning(_("Socks5 transfer failed"))
1027 # FIXME: must clean session
595 return 1028 return
596 client = self.host.getClient(profile) 1029
597 try: 1030 try:
598 data = client.xep_0065_current_stream[sid] 1031 session_data = client.xep_0065_current_stream[sid]
599 file_obj = data["file_obj"] 1032 file_obj = session_data["file_obj"]
600 timer = data["timer"] 1033 timer = session_data["timer"]
601 except KeyError: 1034 except KeyError:
602 log.error(_("Internal error, can't do transfer")) 1035 raise exceptions.InternalError
603 return 1036
604 1037 timer.reset(TIMEOUT)
605 if timer.active(): 1038
606 timer.cancel() 1039 query_elt = iq_elt.elements(NS_BS, 'query').next()
607 1040 streamhost_elts = list(query_elt.elements(NS_BS, 'streamhost-used'))
608 profile_jid, xmlstream = self.host.getJidNStream(profile) 1041
609 query_elt = iq_elt.firstChildElement()
610 streamhost_elts = filter(lambda elt: elt.name == 'streamhost-used', query_elt.elements())
611 if not streamhost_elts: 1042 if not streamhost_elts:
612 log.warning(_("No streamhost found in stream query")) 1043 log.warning(_("No streamhost found in stream query"))
1044 # FIXME: must clean session
613 return 1045 return
614 1046
1047 # FIXME: must be cleaned !
1048
615 streamhost_jid = streamhost_elts[0]['jid'] 1049 streamhost_jid = streamhost_elts[0]['jid']
616 if streamhost_jid != profile_jid.full(): 1050 if streamhost_jid != client.jid.full():
617 log.debug(_("A proxy server is used")) 1051 log.debug(_("A proxy server is used"))
618 proxy_host = self.host.memory.getParamA("Proxy host", "File Transfer", profile_key=profile) 1052 proxy_host = self.host.memory.getParamA("Proxy host", "File Transfer", profile_key=client.profile)
619 proxy_port = self.host.memory.getParamA("Proxy port", "File Transfer", profile_key=profile) 1053 proxy_port = self.host.memory.getParamA("Proxy port", "File Transfer", profile_key=client.profile)
620 proxy_jid = self.host.memory.getParamA("Proxy", "File Transfer", profile_key=profile) 1054 proxy_jid = self.host.memory.getParamA("Proxy", "File Transfer", profile_key=client.profile)
621 if proxy_jid != streamhost_jid: 1055 if proxy_jid != streamhost_jid:
622 log.warning(_("Proxy jid is not the same as in parameters, this should not happen")) 1056 log.warning(_("Proxy jid is not the same as in parameters, this should not happen"))
623 return 1057 return
624 factory = Socks5ClientFactory(client.xep_0065_current_stream, sid, None, self.activateProxyStream, lambda sid, success, profile: self._killId(sid, success, profile=profile), True, profile) 1058 factory = Socks5ClientFactory(client.xep_0065_current_stream, sid, None, self.activateProxyStream, lambda sid, success, profile: self._killSession(sid, client), True, client.profile)
625 reactor.connectTCP(proxy_host, int(proxy_port), factory) 1059 reactor.connectTCP(proxy_host, int(proxy_port), factory)
626 else: 1060 else:
627 data["start_transfer_cb"](file_obj) # We now activate the stream 1061 session_data["start_transfer_cb"](file_obj) # We now activate the stream
628 1062
629 def activateProxyStream(self, sid, iq_id, start_transfer_cb, profile): 1063 def activateProxyStream(self, sid, iq_id, start_transfer_cb, profile):
630 log.debug(_("activating stream")) 1064 log.debug(_("activating stream"))
631 client = self.host.getClient(profile) 1065 client = self.host.getClient(profile)
632 data = client.xep_0065_current_stream[sid] 1066 session_data = client.xep_0065_current_stream[sid]
633 profile_jid, xmlstream = self.host.getJidNStream(profile) 1067
634 1068 iq_elt = client.IQ(client.xmlstream, 'set')
635 iq_elt = client.IQ(xmlstream, 'set') 1069 iq_elt["from"] = client.jid.full()
636 iq_elt["from"] = profile_jid.full()
637 iq_elt["to"] = self.host.memory.getParamA("Proxy", "File Transfer", profile_key=profile) 1070 iq_elt["to"] = self.host.memory.getParamA("Proxy", "File Transfer", profile_key=profile)
638 query_elt = iq_elt.addElement('query', NS_BS) 1071 query_elt = iq_elt.addElement('query', NS_BS)
639 query_elt['sid'] = sid 1072 query_elt['sid'] = sid
640 query_elt.addElement('activate', content=data['to'].full()) 1073 query_elt.addElement('activate', content=session_data['to'].full())
641 iq_elt.addCallback(self.proxyResult, sid, start_transfer_cb, data['file_obj']) 1074 iq_elt.addCallback(self.proxyResult, sid, start_transfer_cb, session_data['file_obj'])
642 iq_elt.send() 1075 iq_elt.send()
643 1076
644 def proxyResult(self, sid, start_transfer_cb, file_obj, iq_elt): 1077 def proxyResult(self, sid, start_transfer_cb, file_obj, iq_elt):
645 if iq_elt['type'] == 'error': 1078 if iq_elt['type'] == 'error':
646 log.warning(_("Can't activate the proxy stream")) 1079 log.warning(_("Can't activate the proxy stream"))
647 return 1080 return
648 else: 1081 else:
649 start_transfer_cb(file_obj) 1082 start_transfer_cb(file_obj)
650 1083
651 def prepareToReceive(self, from_jid, sid, file_obj, size, success_cb, failure_cb, profile): 1084 def createSession(self, *args, **kwargs):
1085 """like [_createSession] but return the session deferred instead of the whole session
1086
1087 session deferred is fired when transfer is finished
1088 """
1089 return self._createSession(*args, **kwargs)[DEFER_KEY]
1090
1091 def _createSession(self, file_obj, to_jid, sid, profile):
652 """Called when a bytestream is imminent 1092 """Called when a bytestream is imminent
653 @param from_jid: jid of the sender 1093
654 @param sid: Stream id 1094 @param file_obj(file): File object where data will be written
655 @param file_obj: File object where data will be written 1095 @param to_jid(jid.JId): jid of the other peer
656 @param size: full size of the data, or None if unknown 1096 @param sid(unicode): session id
657 @param success_cb: method to call when successfuly finished 1097 @param profile: %(doc_profile)s
658 @param failure_cb: method to call when something goes wrong 1098 @return (dict): session data
659 @param profile: %(doc_profile)s""" 1099 """
660 client = self.host.getClient(profile) 1100 client = self.host.getClient(profile)
661 data = client.xep_0065_current_stream[sid] = {} 1101 if sid in client.xep_0065_current_stream:
662 data["from"] = from_jid 1102 raise exceptions.ConflictError(u'A session with this id already exists !')
663 data["file_obj"] = file_obj 1103 session_data = client.xep_0065_current_stream[sid] = \
664 data["seq"] = -1 1104 {'id': sid,
665 if size: 1105 DEFER_KEY: defer.Deferred(),
666 data["size"] = size 1106 'to': to_jid,
667 self.host.registerProgressCB(sid, self.getProgress, profile) 1107 'file_obj': file_obj,
668 data["timer"] = reactor.callLater(TIMEOUT, self._timeOut, sid, profile) 1108 'seq': -1, # FIXME: to check
669 data["success_cb"] = success_cb 1109 'timer': reactor.callLater(TIMEOUT, self._timeOut, sid, client),
670 data["failure_cb"] = failure_cb 1110 }
1111
1112 return session_data
1113
1114 def getSession(self, session_hash, profile):
1115 """Return session data
1116
1117 @param session_hash(unicode): hash of the session
1118 hash is the same as hostname computer in XEP-0065 § 5.3.2 #1
1119 @param profile(None, unicode): profile of the peer
1120 None is used only if profile is unknown (this is only the case
1121 for incoming request received by Socks5ServerFactory). None must
1122 only be used by Socks5ServerFactory.
1123 See comments below for details
1124 @return (dict): session data
1125 """
1126 if profile is None:
1127 try:
1128 profile = self.hash_profiles_map[session_hash]
1129 except KeyError as e:
1130 log.warning(u"The requested session doesn't exists !")
1131 raise e
1132 client = self.host.getClient(profile)
1133 return client._s5b_sessions[session_hash]
1134
1135 def registerHash(self, *args, **kwargs):
1136 """like [_registerHash] but resutrn the session deferred instead of the whole session
1137 session deferred is fired when transfer is finished
1138 """
1139 return self._registerHash(*args, **kwargs)[DEFER_KEY]
1140
1141 def _registerHash(self, session_hash, file_obj, profile):
1142 """Create a session_data associated to hash
1143
1144 @param session_hash(str): hash of the session
1145 @param file_obj(file): file-like object
1146 @param profile: %(doc_profile)s
1147 return (dict): session data
1148 """
1149 client = self.host.getClient(profile)
1150 assert session_hash not in client._s5b_sessions
1151 session_data = client._s5b_sessions[session_hash] = {
1152 "file": file_obj,
1153 DEFER_KEY: defer.Deferred(),
1154 }
1155 if session_hash in self.hash_profiles_map:
1156 # The only case when 2 profiles want to register the same hash
1157 # is when they are on the same instance
1158 log.info(u"Both Socks5 peers are on the same instance")
1159 # XXX:If both peers are on the same instance, they'll register the same
1160 # session_hash, so we'll have 2 profiles for the same hash. The first
1161 # one will be the responder (and so the second one the initiator).
1162 # As we'll keep the initiator choosed candidate (see XEP-0260 § 2.4 #4),
1163 # responder will handle the Socks5 server. Only the server will use
1164 # self.hash_profiles_map to get the profile, so we can ignore the second
1165 # one (the initiator profile).
1166 # There is no easy way to known if the incoming connection
1167 # to the Socks5Server is from initiator or responder, so this seams a
1168 # reasonable workaround.
1169 else:
1170 self.hash_profiles_map[session_hash] = profile
1171
1172 return session_data
671 1173
672 def streamQuery(self, iq_elt, profile): 1174 def streamQuery(self, iq_elt, profile):
673 """Get file using byte stream""" 1175 """Get file using byte stream"""
674 log.debug(_("BS stream query")) 1176 log.debug(_("BS stream query"))
675 client = self.host.getClient(profile) 1177 client = self.host.getClient(profile)
708 return 1210 return
709 1211
710 client.xep_0065_current_stream[sid]["streamhost"] = (sh_host, sh_port, sh_jid) 1212 client.xep_0065_current_stream[sid]["streamhost"] = (sh_host, sh_port, sh_jid)
711 1213
712 log.info(_("Stream proposed: host=[%(host)s] port=[%(port)s]") % {'host': sh_host, 'port': sh_port}) 1214 log.info(_("Stream proposed: host=[%(host)s] port=[%(port)s]") % {'host': sh_host, 'port': sh_port})
713 factory = Socks5ClientFactory(client.xep_0065_current_stream, sid, iq_elt["id"], self.activateStream, lambda sid, success, profile: self._killId(sid, success, profile=profile), profile=profile) 1215 factory = Socks5ClientFactory(client.xep_0065_current_stream, sid, iq_elt["id"], self.activateStream, lambda sid, success, profile: self._killSession(sid, client), profile=profile)
714 reactor.connectTCP(sh_host, int(sh_port), factory) 1216 reactor.connectTCP(sh_host, int(sh_port), factory)
715 1217
716 def activateStream(self, sid, iq_id, profile): 1218 def activateStream(self, sid, iq_id, profile):
717 client = self.host.getClient(profile) 1219 client = self.host.getClient(profile)
718 log.debug(_("activating stream")) 1220 log.debug(_("activating stream"))
719 result = domish.Element((None, 'iq')) 1221 result = domish.Element((None, 'iq'))
720 data = client.xep_0065_current_stream[sid] 1222 session_data = client.xep_0065_current_stream[sid]
721 result['type'] = 'result' 1223 result['type'] = 'result'
722 result['id'] = iq_id 1224 result['id'] = iq_id
723 result['from'] = data["to"].full() 1225 result['from'] = session_data["to"].full()
724 result['to'] = data["from"].full() 1226 result['to'] = session_data["from"].full()
725 query = result.addElement('query', NS_BS) 1227 query = result.addElement('query', NS_BS)
726 query['sid'] = sid 1228 query['sid'] = sid
727 streamhost = query.addElement('streamhost-used') 1229 streamhost = query.addElement('streamhost-used')
728 streamhost['jid'] = data["streamhost"][2] 1230 streamhost['jid'] = session_data["streamhost"][2]
729 data["xmlstream"].send(result) 1231 session_data["xmlstream"].send(result)
730 1232
731 def sendNotAcceptableError(self, iq_id, to_jid, xmlstream): 1233 def sendNotAcceptableError(self, iq_id, to_jid, xmlstream):
732 """Not acceptable error used when the stream is not expected or something is going wrong 1234 """Not acceptable error used when the stream is not expected or something is going wrong
733 @param iq_id: IQ id 1235 @param iq_id: IQ id
734 @param to_jid: addressee 1236 @param to_jid: addressee
762 1264
763 def __init__(self, plugin_parent): 1265 def __init__(self, plugin_parent):
764 self.plugin_parent = plugin_parent 1266 self.plugin_parent = plugin_parent
765 self.host = plugin_parent.host 1267 self.host = plugin_parent.host
766 1268
767 def _proxyDataResult(self, iq_elt):
768 """Called with the information about proxy according to XEP-0065 #4
769 Params should be filled with these infos"""
770 if iq_elt["type"] == "error":
771 log.warning(_("Can't determine proxy information"))
772 return
773 query_elt = iq_elt.firstChildElement()
774 if query_elt.name != "query":
775 log.warning(_("Bad answer received from proxy"))
776 return
777 streamhost_elts = filter(lambda elt: elt.name == 'streamhost', query_elt.elements())
778 if not streamhost_elts:
779 log.warning(_("No streamhost found in stream query"))
780 return
781 if len(streamhost_elts) != 1:
782 log.warning(_("Multiple streamhost elements in proxy not managed, keeping only the first one"))
783 streamhost_elt = streamhost_elts[0]
784 self.host.memory.setParam("Proxy", streamhost_elt.getAttribute("jid", ""),
785 "File Transfer", profile_key=self.parent.profile)
786 self.host.memory.setParam("Proxy host", streamhost_elt.getAttribute("host", ""),
787 "File Transfer", profile_key=self.parent.profile)
788 self.host.memory.setParam("Proxy port", streamhost_elt.getAttribute("port", ""),
789 "File Transfer", profile_key=self.parent.profile)
790
791 def connectionInitialized(self): 1269 def connectionInitialized(self):
792 def connection_ok(dummy): 1270 self.xmlstream.addObserver(BS_REQUEST, self.plugin_parent.streamQuery, profile=self.parent.profile)
793 self.xmlstream.addObserver(BS_REQUEST, self.plugin_parent.streamQuery, profile=self.parent.profile)
794 proxy = self.host.memory.getParamA("Proxy", "File Transfer", profile_key=self.parent.profile)
795 if not proxy:
796 def proxiesFound(entities):
797 try:
798 proxy_ent = entities.pop()
799 except KeyError:
800 log.info(_("No proxy found on this server"))
801 return
802 iq_elt = jabber_client.IQ(self.parent.xmlstream, 'get')
803 iq_elt["to"] = proxy_ent.full()
804 iq_elt.addElement('query', NS_BS)
805 iq_elt.addCallback(self._proxyDataResult)
806 iq_elt.send()
807 d = self.host.findServiceEntities("proxy", "bytestreams", profile_key=self.parent.profile)
808 d.addCallback(proxiesFound)
809 self.parent.getConnectionDeferred().addCallback(connection_ok)
810
811 1271
812 def getDiscoInfo(self, requestor, target, nodeIdentifier=''): 1272 def getDiscoInfo(self, requestor, target, nodeIdentifier=''):
813 return [disco.DiscoFeature(NS_BS)] 1273 return [disco.DiscoFeature(NS_BS)]
814 1274
815 def getDiscoItems(self, requestor, target, nodeIdentifier=''): 1275 def getDiscoItems(self, requestor, target, nodeIdentifier=''):