Mercurial > libervia-backend
comparison sat/plugins/plugin_xep_0065.py @ 2562:26edcf3a30eb
core, setup: huge cleaning:
- moved directories from src and frontends/src to sat and sat_frontends, which is the recommanded naming convention
- move twisted directory to root
- removed all hacks from setup.py, and added missing dependencies, it is now clean
- use https URL for website in setup.py
- removed "Environment :: X11 Applications :: GTK", as wix is deprecated and removed
- renamed sat.sh to sat and fixed its installation
- added python_requires to specify Python version needed
- replaced glib2reactor which use deprecated code by gtk3reactor
sat can now be installed directly from virtualenv without using --system-site-packages anymore \o/
author | Goffi <goffi@goffi.org> |
---|---|
date | Mon, 02 Apr 2018 19:44:50 +0200 |
parents | src/plugins/plugin_xep_0065.py@7ad5f2c4e34a |
children | 56f94936df1e |
comparison
equal
deleted
inserted
replaced
2561:bd30dc3ffe5a | 2562:26edcf3a30eb |
---|---|
1 #!/usr/bin/env python2 | |
2 #-*- coding: utf-8 -*- | |
3 | |
4 # SAT plugin for managing xep-0065 | |
5 | |
6 # Copyright (C) | |
7 # 2002, 2003, 2004 Dave Smith (dizzyd@jabber.org) | |
8 # 2007, 2008 Fabio Forno (xmpp:ff@jabber.bluendo.com) | |
9 # 2009-2018 Jérôme Poisson (goffi@goffi.org) | |
10 | |
11 # This program is free software: you can redistribute it and/or modify | |
12 # it under the terms of the GNU Affero General Public License as published by | |
13 # the Free Software Foundation, either version 3 of the License, or | |
14 # (at your option) any later version. | |
15 | |
16 # This program is distributed in the hope that it will be useful, | |
17 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
18 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
19 # GNU Affero General Public License for more details. | |
20 | |
21 # You should have received a copy of the GNU Affero General Public License | |
22 # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
23 | |
24 # -- | |
25 | |
26 # This module is based on proxy65 (http://code.google.com/p/proxy65), | |
27 # originaly written by David Smith and modified by Fabio Forno. | |
28 # It is sublicensed under AGPL v3 (or any later version) as allowed by the original | |
29 # license. | |
30 | |
31 # -- | |
32 | |
33 # Here is a copy of the original license: | |
34 | |
35 # Copyright (C) | |
36 # 2002-2004 Dave Smith (dizzyd@jabber.org) | |
37 # 2007-2008 Fabio Forno (xmpp:ff@jabber.bluendo.com) | |
38 | |
39 # Permission is hereby granted, free of charge, to any person obtaining a copy | |
40 # of this software and associated documentation files (the "Software"), to deal | |
41 # in the Software without restriction, including without limitation the rights | |
42 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
43 # copies of the Software, and to permit persons to whom the Software is | |
44 # furnished to do so, subject to the following conditions: | |
45 | |
46 # The above copyright notice and this permission notice shall be included in | |
47 # all copies or substantial portions of the Software. | |
48 | |
49 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
50 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
51 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
52 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
53 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
54 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN | |
55 # THE SOFTWARE. | |
56 | |
57 from sat.core.i18n import _ | |
58 from sat.core.log import getLogger | |
59 log = getLogger(__name__) | |
60 from sat.core.constants import Const as C | |
61 from sat.core import exceptions | |
62 from sat.tools import sat_defer | |
63 from twisted.internet import protocol | |
64 from twisted.internet import reactor | |
65 from twisted.internet import error as internet_error | |
66 from twisted.words.protocols.jabber import error as jabber_error | |
67 from twisted.words.protocols.jabber import jid | |
68 from twisted.words.protocols.jabber import xmlstream | |
69 from twisted.internet import defer | |
70 from collections import namedtuple | |
71 import struct | |
72 import hashlib | |
73 import uuid | |
74 | |
75 from zope.interface import implements | |
76 | |
77 try: | |
78 from twisted.words.protocols.xmlstream import XMPPHandler | |
79 except ImportError: | |
80 from wokkel.subprotocols import XMPPHandler | |
81 | |
82 from wokkel import disco, iwokkel | |
83 | |
84 | |
85 PLUGIN_INFO = { | |
86 C.PI_NAME: "XEP 0065 Plugin", | |
87 C.PI_IMPORT_NAME: "XEP-0065", | |
88 C.PI_TYPE: "XEP", | |
89 C.PI_MODES: C.PLUG_MODE_BOTH, | |
90 C.PI_PROTOCOLS: ["XEP-0065"], | |
91 C.PI_DEPENDENCIES: ["IP"], | |
92 C.PI_RECOMMENDATIONS: ["NAT-PORT"], | |
93 C.PI_MAIN: "XEP_0065", | |
94 C.PI_HANDLER: "yes", | |
95 C.PI_DESCRIPTION: _("""Implementation of SOCKS5 Bytestreams""") | |
96 } | |
97 | |
98 IQ_SET = '/iq[@type="set"]' | |
99 NS_BS = 'http://jabber.org/protocol/bytestreams' | |
100 BS_REQUEST = IQ_SET + '/query[@xmlns="' + NS_BS + '"]' | |
101 TIMER_KEY = 'timer' | |
102 DEFER_KEY = 'finished' # key of the deferred used to track session end | |
103 SERVER_STARTING_PORT = 0 # starting number for server port search (0 to ask automatic attribution) | |
104 | |
105 # priorities are candidates local priorities, must be a int between 0 and 65535 | |
106 PRIORITY_BEST_DIRECT = 10000 | |
107 PRIORITY_DIRECT = 5000 | |
108 PRIORITY_ASSISTED = 1000 | |
109 PRIORITY_PROXY = 0.2 # proxy is the last option for s5b | |
110 CANDIDATE_DELAY = 0.2 # see XEP-0260 §4 | |
111 CANDIDATE_DELAY_PROXY = 0.2 # additional time for proxy types (see XEP-0260 §4 note 3) | |
112 | |
113 TIMEOUT = 300 # maxium time between session creation and stream start | |
114 | |
115 # XXX: by default eveything is automatic | |
116 # TODO: use these params to force use of specific proxy/port/IP | |
117 # PARAMS = """ | |
118 # <params> | |
119 # <general> | |
120 # <category name="File Transfer"> | |
121 # <param name="Force IP" type="string" /> | |
122 # <param name="Force Port" type="int" constraint="1;65535" /> | |
123 # </category> | |
124 # </general> | |
125 # <individual> | |
126 # <category name="File Transfer"> | |
127 # <param name="Force Proxy" value="" type="string" /> | |
128 # <param name="Force Proxy host" value="" type="string" /> | |
129 # <param name="Force Proxy port" value="" type="int" constraint="1;65535" /> | |
130 # </category> | |
131 # </individual> | |
132 # </params> | |
133 # """ | |
134 | |
135 (STATE_INITIAL, | |
136 STATE_AUTH, | |
137 STATE_REQUEST, | |
138 STATE_READY, | |
139 STATE_AUTH_USERPASS, | |
140 STATE_CLIENT_INITIAL, | |
141 STATE_CLIENT_AUTH, | |
142 STATE_CLIENT_REQUEST, | |
143 ) = xrange(8) | |
144 | |
145 SOCKS5_VER = 0x05 | |
146 | |
147 ADDR_IPV4 = 0x01 | |
148 ADDR_DOMAINNAME = 0x03 | |
149 ADDR_IPV6 = 0x04 | |
150 | |
151 CMD_CONNECT = 0x01 | |
152 CMD_BIND = 0x02 | |
153 CMD_UDPASSOC = 0x03 | |
154 | |
155 AUTHMECH_ANON = 0x00 | |
156 AUTHMECH_USERPASS = 0x02 | |
157 AUTHMECH_INVALID = 0xFF | |
158 | |
159 REPLY_SUCCESS = 0x00 | |
160 REPLY_GENERAL_FAILUR = 0x01 | |
161 REPLY_CONN_NOT_ALLOWED = 0x02 | |
162 REPLY_NETWORK_UNREACHABLE = 0x03 | |
163 REPLY_HOST_UNREACHABLE = 0x04 | |
164 REPLY_CONN_REFUSED = 0x05 | |
165 REPLY_TTL_EXPIRED = 0x06 | |
166 REPLY_CMD_NOT_SUPPORTED = 0x07 | |
167 REPLY_ADDR_NOT_SUPPORTED = 0x08 | |
168 | |
169 | |
170 ProxyInfos = namedtuple("ProxyInfos", ['host', 'jid', 'port']) | |
171 | |
172 | |
173 class Candidate(object): | |
174 | |
175 def __init__(self, host, port, type_, priority, jid_, id_=None, priority_local=False, factory=None): | |
176 """ | |
177 @param host(unicode): host IP or domain | |
178 @param port(int): port | |
179 @param type_(unicode): stream type (one of XEP_0065.TYPE_*) | |
180 @param priority(int): priority | |
181 @param jid_(jid.JID): jid | |
182 @param id_(None, id_): Candidate ID, or None to generate | |
183 @param priority_local(bool): if True, priority is used as local priority, | |
184 else priority is used as global one (and local priority is set to 0) | |
185 """ | |
186 assert isinstance(jid_, jid.JID) | |
187 self.host, self.port, self.type, self.jid = ( | |
188 host, int(port), type_, jid_) | |
189 self.id = id_ if id_ is not None else unicode(uuid.uuid4()) | |
190 if priority_local: | |
191 self._local_priority = int(priority) | |
192 self._priority = self.calculatePriority() | |
193 else: | |
194 self._local_priority = 0 | |
195 self._priority = int(priority) | |
196 self.factory = factory | |
197 | |
198 def discard(self): | |
199 """Disconnect a candidate if it is connected | |
200 | |
201 Used to disconnect tryed client when they are discarded | |
202 """ | |
203 log.debug(u"Discarding {}".format(self)) | |
204 try: | |
205 self.factory.discard() | |
206 except AttributeError: | |
207 pass # no discard for Socks5ServerFactory | |
208 | |
209 @property | |
210 def local_priority(self): | |
211 return self._local_priority | |
212 | |
213 @property | |
214 def priority(self): | |
215 return self._priority | |
216 | |
217 def __str__(self): | |
218 # similar to __unicode__ but we don't show jid and we encode id | |
219 return "Candidate ({0.priority}): host={0.host} port={0.port} type={0.type}{id}".format( | |
220 self, | |
221 id=u" id={}".format(self.id if self.id is not None else u'').encode('utf-8', 'ignore'), | |
222 ) | |
223 | |
224 def __unicode__(self): | |
225 return u"Candidate ({0.priority}): host={0.host} port={0.port} jid={0.jid} type={0.type}{id}".format( | |
226 self, | |
227 id=u" id={}".format(self.id if self.id is not None else u''), | |
228 ) | |
229 | |
230 def __eq__(self, other): | |
231 # self.id is is not used in __eq__ as the same candidate can have | |
232 # different ids if proposed by initiator or responder | |
233 try: | |
234 return (self.host == other.host and | |
235 self.port == other.port and | |
236 self.jid == other.jid) | |
237 except (AttributeError, TypeError): | |
238 return False | |
239 | |
240 def __ne__(self, other): | |
241 return not self.__eq__(other) | |
242 | |
243 def calculatePriority(self): | |
244 """Calculate candidate priority according to XEP-0260 §2.2 | |
245 | |
246 | |
247 @return (int): priority | |
248 """ | |
249 if self.type == XEP_0065.TYPE_DIRECT: | |
250 multiplier = 126 | |
251 elif self.type == XEP_0065.TYPE_ASSISTED: | |
252 multiplier = 120 | |
253 elif self.type == XEP_0065.TYPE_TUNEL: | |
254 multiplier = 110 | |
255 elif self.type == XEP_0065.TYPE_PROXY: | |
256 multiplier = 10 | |
257 else: | |
258 raise exceptions.InternalError(u"Unknown {} type !".format(self.type)) | |
259 return 2**16 * multiplier + self._local_priority | |
260 | |
261 def activate(self, sid, peer_jid, client): | |
262 """Activate the proxy candidate | |
263 | |
264 Send activation request as explained in XEP-0065 § 6.3.5 | |
265 Must only be used with proxy candidates | |
266 @param sid(unicode): session id (same as for getSessionHash) | |
267 @param peer_jid(jid.JID): jid of the other peer | |
268 @return (D(domish.Element)): IQ result (or error) | |
269 """ | |
270 assert self.type == XEP_0065.TYPE_PROXY | |
271 iq_elt = client.IQ() | |
272 iq_elt['to'] = self.jid.full() | |
273 query_elt = iq_elt.addElement((NS_BS, 'query')) | |
274 query_elt['sid'] = sid | |
275 query_elt.addElement('activate', content=peer_jid.full()) | |
276 return iq_elt.send() | |
277 | |
278 def startTransfer(self, session_hash=None): | |
279 if self.type == XEP_0065.TYPE_PROXY: | |
280 chunk_size = 4096 # Prosody's proxy reject bigger chunks by default | |
281 else: | |
282 chunk_size = None | |
283 self.factory.startTransfer(session_hash, chunk_size=chunk_size) | |
284 | |
285 | |
286 def getSessionHash(requester_jid, target_jid, sid): | |
287 """Calculate SHA1 Hash according to XEP-0065 §5.3.2 | |
288 | |
289 @param requester_jid(jid.JID): jid of the requester (the one which activate the proxy) | |
290 @param target_jid(jid.JID): jid of the target | |
291 @param sid(unicode): session id | |
292 @return (str): hash | |
293 """ | |
294 return hashlib.sha1((sid + requester_jid.full() + target_jid.full()).encode('utf-8')).hexdigest() | |
295 | |
296 | |
297 class SOCKSv5(protocol.Protocol): | |
298 CHUNK_SIZE = 2**16 | |
299 | |
300 def __init__(self, session_hash=None): | |
301 """ | |
302 @param session_hash(str): hash of the session | |
303 must only be used in client mode | |
304 """ | |
305 self.connection = defer.Deferred() # called when connection/auth is done | |
306 if session_hash is not None: | |
307 self.server_mode = False | |
308 self._session_hash = session_hash | |
309 self.state = STATE_CLIENT_INITIAL | |
310 else: | |
311 self.server_mode = True | |
312 self.state = STATE_INITIAL | |
313 self.buf = "" | |
314 self.supportedAuthMechs = [AUTHMECH_ANON] | |
315 self.supportedAddrs = [ADDR_DOMAINNAME] | |
316 self.enabledCommands = [CMD_CONNECT] | |
317 self.peersock = None | |
318 self.addressType = 0 | |
319 self.requestType = 0 | |
320 self._stream_object = None | |
321 self.active = False # set to True when protocol is actually used for transfer | |
322 # used by factories to know when the finished Deferred can be triggered | |
323 | |
324 @property | |
325 def stream_object(self): | |
326 if self._stream_object is None: | |
327 self._stream_object = self.getSession()['stream_object'] | |
328 if self.server_mode: | |
329 self._stream_object.registerProducer(self.transport, True) | |
330 return self._stream_object | |
331 | |
332 def getSession(self): | |
333 """Return session associated with this candidate | |
334 | |
335 @return (dict): session data | |
336 """ | |
337 if self.server_mode: | |
338 return self.factory.getSession(self._session_hash) | |
339 else: | |
340 return self.factory.getSession() | |
341 | |
342 def _startNegotiation(self): | |
343 log.debug("starting negotiation (client mode)") | |
344 self.state = STATE_CLIENT_AUTH | |
345 self.transport.write(struct.pack('!3B', SOCKS5_VER, 1, AUTHMECH_ANON)) | |
346 | |
347 def _parseNegotiation(self): | |
348 try: | |
349 # Parse out data | |
350 ver, nmethod = struct.unpack('!BB', self.buf[:2]) | |
351 methods = struct.unpack('%dB' % nmethod, self.buf[2:nmethod + 2]) | |
352 | |
353 # Ensure version is correct | |
354 if ver != 5: | |
355 self.transport.write(struct.pack('!BB', SOCKS5_VER, AUTHMECH_INVALID)) | |
356 self.transport.loseConnection() | |
357 return | |
358 | |
359 # Trim off front of the buffer | |
360 self.buf = self.buf[nmethod + 2:] | |
361 | |
362 # Check for supported auth mechs | |
363 for m in self.supportedAuthMechs: | |
364 if m in methods: | |
365 # Update internal state, according to selected method | |
366 if m == AUTHMECH_ANON: | |
367 self.state = STATE_REQUEST | |
368 elif m == AUTHMECH_USERPASS: | |
369 self.state = STATE_AUTH_USERPASS | |
370 # Complete negotiation w/ this method | |
371 self.transport.write(struct.pack('!BB', SOCKS5_VER, m)) | |
372 return | |
373 | |
374 # No supported mechs found, notify client and close the connection | |
375 log.warning(u"Unsupported authentication mechanism") | |
376 self.transport.write(struct.pack('!BB', SOCKS5_VER, AUTHMECH_INVALID)) | |
377 self.transport.loseConnection() | |
378 except struct.error: | |
379 pass | |
380 | |
381 def _parseUserPass(self): | |
382 try: | |
383 # Parse out data | |
384 ver, ulen = struct.unpack('BB', self.buf[:2]) | |
385 uname, = struct.unpack('%ds' % ulen, self.buf[2:ulen + 2]) | |
386 plen, = struct.unpack('B', self.buf[ulen + 2]) | |
387 password, = struct.unpack('%ds' % plen, self.buf[ulen + 3:ulen + 3 + plen]) | |
388 # Trim off fron of the buffer | |
389 self.buf = self.buf[3 + ulen + plen:] | |
390 # Fire event to authenticate user | |
391 if self.authenticateUserPass(uname, password): | |
392 # Signal success | |
393 self.state = STATE_REQUEST | |
394 self.transport.write(struct.pack('!BB', SOCKS5_VER, 0x00)) | |
395 else: | |
396 # Signal failure | |
397 self.transport.write(struct.pack('!BB', SOCKS5_VER, 0x01)) | |
398 self.transport.loseConnection() | |
399 except struct.error: | |
400 pass | |
401 | |
402 def sendErrorReply(self, errorcode): | |
403 # Any other address types are not supported | |
404 result = struct.pack('!BBBBIH', SOCKS5_VER, errorcode, 0, 1, 0, 0) | |
405 self.transport.write(result) | |
406 self.transport.loseConnection() | |
407 | |
408 def _parseRequest(self): | |
409 try: | |
410 # Parse out data and trim buffer accordingly | |
411 ver, cmd, rsvd, self.addressType = struct.unpack('!BBBB', self.buf[:4]) | |
412 | |
413 # Ensure we actually support the requested address type | |
414 if self.addressType not in self.supportedAddrs: | |
415 self.sendErrorReply(REPLY_ADDR_NOT_SUPPORTED) | |
416 return | |
417 | |
418 # Deal with addresses | |
419 if self.addressType == ADDR_IPV4: | |
420 addr, port = struct.unpack('!IH', self.buf[4:10]) | |
421 self.buf = self.buf[10:] | |
422 elif self.addressType == ADDR_DOMAINNAME: | |
423 nlen = ord(self.buf[4]) | |
424 addr, port = struct.unpack('!%dsH' % nlen, self.buf[5:]) | |
425 self.buf = self.buf[7 + len(addr):] | |
426 else: | |
427 # Any other address types are not supported | |
428 self.sendErrorReply(REPLY_ADDR_NOT_SUPPORTED) | |
429 return | |
430 | |
431 # Ensure command is supported | |
432 if cmd not in self.enabledCommands: | |
433 # Send a not supported error | |
434 self.sendErrorReply(REPLY_CMD_NOT_SUPPORTED) | |
435 return | |
436 | |
437 # Process the command | |
438 if cmd == CMD_CONNECT: | |
439 self.connectRequested(addr, port) | |
440 elif cmd == CMD_BIND: | |
441 self.bindRequested(addr, port) | |
442 else: | |
443 # Any other command is not supported | |
444 self.sendErrorReply(REPLY_CMD_NOT_SUPPORTED) | |
445 | |
446 except struct.error: | |
447 # The buffer is probably not complete, we need to wait more | |
448 return None | |
449 | |
450 def _makeRequest(self): | |
451 hash_ = self._session_hash | |
452 request = struct.pack('!5B%dsH' % len(hash_), SOCKS5_VER, CMD_CONNECT, 0, ADDR_DOMAINNAME, len(hash_), hash_, 0) | |
453 self.transport.write(request) | |
454 self.state = STATE_CLIENT_REQUEST | |
455 | |
456 def _parseRequestReply(self): | |
457 try: | |
458 ver, rep, rsvd, self.addressType = struct.unpack('!BBBB', self.buf[:4]) | |
459 # Ensure we actually support the requested address type | |
460 if self.addressType not in self.supportedAddrs: | |
461 self.sendErrorReply(REPLY_ADDR_NOT_SUPPORTED) | |
462 return | |
463 | |
464 # Deal with addresses | |
465 if self.addressType == ADDR_IPV4: | |
466 addr, port = struct.unpack('!IH', self.buf[4:10]) | |
467 self.buf = self.buf[10:] | |
468 elif self.addressType == ADDR_DOMAINNAME: | |
469 nlen = ord(self.buf[4]) | |
470 addr, port = struct.unpack('!%dsH' % nlen, self.buf[5:]) | |
471 self.buf = self.buf[7 + len(addr):] | |
472 else: | |
473 # Any other address types are not supported | |
474 self.sendErrorReply(REPLY_ADDR_NOT_SUPPORTED) | |
475 return | |
476 | |
477 # Ensure reply is OK | |
478 if rep != REPLY_SUCCESS: | |
479 self.loseConnection() | |
480 return | |
481 | |
482 self.state = STATE_READY | |
483 self.connection.callback(None) | |
484 | |
485 except struct.error: | |
486 # The buffer is probably not complete, we need to wait more | |
487 return None | |
488 | |
489 def connectionMade(self): | |
490 log.debug(u"Socks5 connectionMade (mode = {})".format("server" if self.state == STATE_INITIAL else "client")) | |
491 if self.state == STATE_CLIENT_INITIAL: | |
492 self._startNegotiation() | |
493 | |
494 def connectRequested(self, addr, port): | |
495 # Check that this session is expected | |
496 if not self.factory.addToSession(addr, self): | |
497 self.sendErrorReply(REPLY_CONN_REFUSED) | |
498 log.warning(u"Unexpected connection request received from {host}" | |
499 .format(host=self.transport.getPeer().host)) | |
500 return | |
501 self._session_hash = addr | |
502 self.connectCompleted(addr, 0) | |
503 | |
504 def startTransfer(self, chunk_size): | |
505 """Callback called when the result iq is received | |
506 | |
507 @param chunk_size(None, int): size of the buffer, or None for default | |
508 """ | |
509 self.active = True | |
510 if chunk_size is not None: | |
511 self.CHUNK_SIZE = chunk_size | |
512 log.debug(u"Starting file transfer") | |
513 d = self.stream_object.startStream(self.transport) | |
514 d.addCallback(self.streamFinished) | |
515 | |
516 def streamFinished(self, d): | |
517 log.info(_("File transfer completed, closing connection")) | |
518 self.transport.loseConnection() | |
519 | |
520 def connectCompleted(self, remotehost, remoteport): | |
521 if self.addressType == ADDR_IPV4: | |
522 result = struct.pack('!BBBBIH', SOCKS5_VER, REPLY_SUCCESS, 0, 1, remotehost, remoteport) | |
523 elif self.addressType == ADDR_DOMAINNAME: | |
524 result = struct.pack('!BBBBB%dsH' % len(remotehost), SOCKS5_VER, REPLY_SUCCESS, 0, | |
525 ADDR_DOMAINNAME, len(remotehost), remotehost, remoteport) | |
526 self.transport.write(result) | |
527 self.state = STATE_READY | |
528 | |
529 def bindRequested(self, addr, port): | |
530 pass | |
531 | |
532 def authenticateUserPass(self, user, passwd): | |
533 # FIXME: implement authentication and remove the debug printing a password | |
534 log.debug(u"User/pass: %s/%s" % (user, passwd)) | |
535 return True | |
536 | |
537 def dataReceived(self, buf): | |
538 if self.state == STATE_READY: | |
539 # Everything is set, we just have to write the incoming data | |
540 self.stream_object.write(buf) | |
541 if not self.active: | |
542 self.active = True | |
543 self.getSession()[TIMER_KEY].cancel() | |
544 return | |
545 | |
546 self.buf = self.buf + buf | |
547 if self.state == STATE_INITIAL: | |
548 self._parseNegotiation() | |
549 if self.state == STATE_AUTH_USERPASS: | |
550 self._parseUserPass() | |
551 if self.state == STATE_REQUEST: | |
552 self._parseRequest() | |
553 if self.state == STATE_CLIENT_REQUEST: | |
554 self._parseRequestReply() | |
555 if self.state == STATE_CLIENT_AUTH: | |
556 ver, method = struct.unpack('!BB', buf) | |
557 self.buf = self.buf[2:] | |
558 if ver != SOCKS5_VER or method != AUTHMECH_ANON: | |
559 self.transport.loseConnection() | |
560 else: | |
561 self._makeRequest() | |
562 | |
563 def connectionLost(self, reason): | |
564 log.debug(u"Socks5 connection lost: {}".format(reason.value)) | |
565 if self.state != STATE_READY: | |
566 self.connection.errback(reason) | |
567 if self.server_mode : | |
568 self.factory.removeFromSession(self._session_hash, self, reason) | |
569 | |
570 | |
571 class Socks5ServerFactory(protocol.ServerFactory): | |
572 protocol = SOCKSv5 | |
573 | |
574 def __init__(self, parent): | |
575 """ | |
576 @param parent(XEP_0065): XEP_0065 parent instance | |
577 """ | |
578 self.parent = parent | |
579 | |
580 def getSession(self, session_hash): | |
581 return self.parent.getSession(None, session_hash) | |
582 | |
583 def startTransfer(self, session_hash, chunk_size=None): | |
584 session = self.getSession(session_hash) | |
585 try: | |
586 protocol = session['protocols'][0] | |
587 except (KeyError, IndexError): | |
588 log.error(u"Can't start file transfer, can't find protocol") | |
589 else: | |
590 session[TIMER_KEY].cancel() | |
591 protocol.startTransfer(chunk_size) | |
592 | |
593 def addToSession(self, session_hash, protocol): | |
594 """Check is session_hash is valid, and associate protocol with it | |
595 | |
596 the session will be associated to the corresponding candidate | |
597 @param session_hash(str): hash of the session | |
598 @param protocol(SOCKSv5): protocol instance | |
599 @param return(bool): True if hash was valid (i.e. expected), False else | |
600 """ | |
601 try: | |
602 session_data = self.getSession(session_hash) | |
603 except KeyError: | |
604 return False | |
605 else: | |
606 session_data.setdefault('protocols', []).append(protocol) | |
607 return True | |
608 | |
609 def removeFromSession(self, session_hash, protocol, reason): | |
610 """Remove a protocol from session_data | |
611 | |
612 There can be several protocol instances while candidates are tried, they | |
613 have removed when candidate connection is closed | |
614 @param session_hash(str): hash of the session | |
615 @param protocol(SOCKSv5): protocol instance | |
616 @param reason(failure.Failure): reason of the removal | |
617 """ | |
618 try: | |
619 protocols = self.getSession(session_hash)['protocols'] | |
620 protocols.remove(protocol) | |
621 except (KeyError, ValueError): | |
622 log.error(u"Protocol not found in session while it should be there") | |
623 else: | |
624 if protocol.active: | |
625 # The active protocol has been removed, session is finished | |
626 if reason.check(internet_error.ConnectionDone): | |
627 self.getSession(session_hash)[DEFER_KEY].callback(None) | |
628 else: | |
629 self.getSession(session_hash)[DEFER_KEY].errback(reason) | |
630 | |
631 | |
632 class Socks5ClientFactory(protocol.ClientFactory): | |
633 protocol = SOCKSv5 | |
634 | |
635 def __init__(self, client, parent, session, session_hash): | |
636 """Init the Client Factory | |
637 | |
638 @param session(dict): session data | |
639 @param session_hash(unicode): hash used for peer_connection | |
640 hash is the same as hostname computed in XEP-0065 § 5.3.2 #1 | |
641 """ | |
642 self.session = session | |
643 self.session_hash = session_hash | |
644 self.client = client | |
645 self.connection = defer.Deferred() | |
646 self._protocol_instance = None | |
647 self.connector = None | |
648 | |
649 def discard(self): | |
650 """Disconnect the client | |
651 | |
652 Also set a discarded flag, which avoid to call the session Deferred | |
653 """ | |
654 self.connector.disconnect() | |
655 | |
656 def getSession(self): | |
657 return self.session | |
658 | |
659 def startTransfer(self, dummy=None, chunk_size=None): | |
660 self.session[TIMER_KEY].cancel() | |
661 self._protocol_instance.startTransfer(chunk_size) | |
662 | |
663 def clientConnectionFailed(self, connector, reason): | |
664 log.debug(u"Connection failed") | |
665 self.connection.errback(reason) | |
666 | |
667 def clientConnectionLost(self, connector, reason): | |
668 log.debug(_(u"Socks 5 client connection lost (reason: %s)") % reason.value) | |
669 if self._protocol_instance.active: | |
670 # This one was used for the transfer, than mean that | |
671 # the Socks5 session is finished | |
672 if reason.check(internet_error.ConnectionDone): | |
673 self.getSession()[DEFER_KEY].callback(None) | |
674 else: | |
675 self.getSession()[DEFER_KEY].errback(reason) | |
676 self._protocol_instance = None | |
677 | |
678 def buildProtocol(self, addr): | |
679 log.debug(("Socks 5 client connection started")) | |
680 p = self.protocol(session_hash=self.session_hash) | |
681 p.factory = self | |
682 p.connection.chainDeferred(self.connection) | |
683 self._protocol_instance = p | |
684 return p | |
685 | |
686 | |
687 class XEP_0065(object): | |
688 NAMESPACE = NS_BS | |
689 TYPE_DIRECT = 'direct' | |
690 TYPE_ASSISTED = 'assisted' | |
691 TYPE_TUNEL = 'tunel' | |
692 TYPE_PROXY = 'proxy' | |
693 Candidate = Candidate | |
694 | |
695 def __init__(self, host): | |
696 log.info(_("Plugin XEP_0065 initialization")) | |
697 self.host = host | |
698 | |
699 # session data | |
700 self.hash_clients_map = {} # key: hash of the transfer session, value: session data | |
701 self._cache_proxies = {} # key: server jid, value: proxy data | |
702 | |
703 # misc data | |
704 self._server_factory = None | |
705 self._external_port = None | |
706 | |
707 # plugins shortcuts | |
708 self._ip = self.host.plugins['IP'] | |
709 try: | |
710 self._np = self.host.plugins['NAT-PORT'] | |
711 except KeyError: | |
712 log.debug(u"NAT Port plugin not available") | |
713 self._np = None | |
714 | |
715 # parameters | |
716 # XXX: params are not used for now, but they may be used in the futur to force proxy/IP | |
717 # host.memory.updateParams(PARAMS) | |
718 | |
719 def getHandler(self, client): | |
720 return XEP_0065_handler(self) | |
721 | |
722 def profileConnected(self, client): | |
723 client.xep_0065_sid_session = {} # key: stream_id, value: session_data(dict) | |
724 client._s5b_sessions = {} | |
725 | |
726 def getSessionHash(self, from_jid, to_jid, sid): | |
727 return getSessionHash(from_jid, to_jid, sid) | |
728 | |
729 def getSocks5ServerFactory(self): | |
730 """Return server factory | |
731 | |
732 The server is created if it doesn't exists yet | |
733 self._server_factory_port is set on server creation | |
734 """ | |
735 | |
736 if self._server_factory is None: | |
737 self._server_factory = Socks5ServerFactory(self) | |
738 for port in xrange(SERVER_STARTING_PORT, 65356): | |
739 try: | |
740 listening_port = reactor.listenTCP(port, self._server_factory) | |
741 except internet_error.CannotListenError as e: | |
742 log.debug(u"Cannot listen on port {port}: {err_msg}{err_num}".format( | |
743 port=port, | |
744 err_msg=e.socketError.strerror, | |
745 err_num=u' (error code: {})'.format(e.socketError.errno), | |
746 )) | |
747 else: | |
748 self._server_factory_port = listening_port.getHost().port | |
749 break | |
750 | |
751 log.info(_("Socks5 Stream server launched on port {}").format(self._server_factory_port)) | |
752 return self._server_factory | |
753 | |
754 @defer.inlineCallbacks | |
755 def getProxy(self, client): | |
756 """Return the proxy available for this profile | |
757 | |
758 cache is used between clients using the same server | |
759 @return ((D)(ProxyInfos, None)): Found proxy infos, | |
760 or None if not acceptable proxy is found | |
761 """ | |
762 def notFound(server): | |
763 log.info(u"No proxy found on this server") | |
764 self._cache_proxies[server] = None | |
765 defer.returnValue(None) | |
766 server = client.jid.host | |
767 try: | |
768 defer.returnValue(self._cache_proxies[server]) | |
769 except KeyError: | |
770 pass | |
771 try: | |
772 proxy = (yield self.host.findServiceEntities(client, 'proxy', 'bytestreams')).pop() | |
773 except (defer.CancelledError, StopIteration, KeyError): | |
774 notFound(server) | |
775 iq_elt = client.IQ('get') | |
776 iq_elt['to'] = proxy.full() | |
777 iq_elt.addElement((NS_BS, 'query')) | |
778 | |
779 try: | |
780 result_elt = yield iq_elt.send() | |
781 except jabber_error.StanzaError as failure: | |
782 log.warning(u"Error while requesting proxy info on {jid}: {error}" | |
783 .format(proxy.full(), failure)) | |
784 notFound(server) | |
785 | |
786 try: | |
787 query_elt = result_elt.elements(NS_BS, 'query').next() | |
788 streamhost_elt = query_elt.elements(NS_BS, 'streamhost').next() | |
789 host = streamhost_elt['host'] | |
790 jid_ = streamhost_elt['jid'] | |
791 port = streamhost_elt['port'] | |
792 if not all((host, jid, port)): | |
793 raise KeyError | |
794 jid_ = jid.JID(jid_) | |
795 except (StopIteration, KeyError, RuntimeError, jid.InvalidFormat, AttributeError): | |
796 log.warning(u"Invalid proxy data received from {}".format(proxy.full())) | |
797 notFound(server) | |
798 | |
799 proxy_infos = self._cache_proxies[server] = ProxyInfos(host, jid_, port) | |
800 log.info(u"Proxy found: {}".format(proxy_infos)) | |
801 defer.returnValue(proxy_infos) | |
802 | |
803 @defer.inlineCallbacks | |
804 def _getNetworkData(self, client): | |
805 """Retrieve information about network | |
806 | |
807 @param client: %(doc_client)s | |
808 @return (D(tuple[local_port, external_port, local_ips, external_ip])): network data | |
809 """ | |
810 self.getSocks5ServerFactory() | |
811 local_port = self._server_factory_port | |
812 external_ip = yield self._ip.getExternalIP(client) | |
813 local_ips = yield self._ip.getLocalIPs(client) | |
814 | |
815 if external_ip is not None and self._external_port is None: | |
816 if external_ip != local_ips[0]: | |
817 log.info(u"We are probably behind a NAT") | |
818 if self._np is None: | |
819 log.warning(u"NAT port plugin not available, we can't map port") | |
820 else: | |
821 ext_port = yield self._np.mapPort(local_port, desc=u"SaT socks5 stream") | |
822 if ext_port is None: | |
823 log.warning(u"Can't map NAT port") | |
824 else: | |
825 self._external_port = ext_port | |
826 | |
827 defer.returnValue((local_port, self._external_port, local_ips, external_ip)) | |
828 | |
829 @defer.inlineCallbacks | |
830 def getCandidates(self, client): | |
831 """Return a list of our stream candidates | |
832 | |
833 @return (D(list[Candidate])): list of candidates, ordered by priority | |
834 """ | |
835 server_factory = yield self.getSocks5ServerFactory() | |
836 local_port, ext_port, local_ips, external_ip = yield self._getNetworkData(client) | |
837 proxy = yield self.getProxy(client) | |
838 | |
839 # its time to gather the candidates | |
840 candidates = [] | |
841 | |
842 # first the direct ones | |
843 | |
844 # the preferred direct connection | |
845 ip = local_ips.pop(0) | |
846 candidates.append(Candidate(ip, local_port, XEP_0065.TYPE_DIRECT, PRIORITY_BEST_DIRECT, client.jid, priority_local=True, factory=server_factory)) | |
847 for ip in local_ips: | |
848 candidates.append(Candidate(ip, local_port, XEP_0065.TYPE_DIRECT, PRIORITY_DIRECT, client.jid, priority_local=True, factory=server_factory)) | |
849 | |
850 # then the assisted one | |
851 if ext_port is not None: | |
852 candidates.append(Candidate(external_ip, ext_port, XEP_0065.TYPE_ASSISTED, PRIORITY_ASSISTED, client.jid, priority_local=True, factory=server_factory)) | |
853 | |
854 # finally the proxy | |
855 if proxy: | |
856 candidates.append(Candidate(proxy.host, proxy.port, XEP_0065.TYPE_PROXY, PRIORITY_PROXY, proxy.jid, priority_local=True)) | |
857 | |
858 # should be already sorted, but just in case the priorities get weird | |
859 candidates.sort(key=lambda c: c.priority, reverse=True) | |
860 defer.returnValue(candidates) | |
861 | |
862 def _addConnector(self, connector, candidate): | |
863 """Add connector used to connect to candidate, and return client factory's connection Deferred | |
864 | |
865 the connector can be used to disconnect the candidate, and returning the factory's connection Deferred allow to wait for connection completion | |
866 @param connector: a connector implementing IConnector | |
867 @param candidate(Candidate): candidate linked to the connector | |
868 @return (D): Deferred fired when factory connection is done or has failed | |
869 """ | |
870 candidate.factory.connector = connector | |
871 return candidate.factory.connection | |
872 | |
873 def connectCandidate(self, client, candidate, session_hash, peer_session_hash=None, delay=None): | |
874 """Connect to a candidate | |
875 | |
876 Connection will be done with a Socks5ClientFactory | |
877 @param candidate(Candidate): candidate to connect to | |
878 @param session_hash(unicode): hash of the session | |
879 hash is the same as hostname computed in XEP-0065 § 5.3.2 #1 | |
880 @param peer_session_hash(unicode, None): hash used with the peer | |
881 None to use session_hash. | |
882 None must be used in 2 cases: | |
883 - when XEP-0065 is used with XEP-0096 | |
884 - when a peer connect to a proxy *he proposed himself* | |
885 in practice, peer_session_hash is only used by tryCandidates | |
886 @param delay(None, float): optional delay to wait before connection, in seconds | |
887 @return (D): Deferred launched when TCP connection + Socks5 connection is done | |
888 """ | |
889 if peer_session_hash is None: | |
890 # for XEP-0065, only one hash is needed | |
891 peer_session_hash = session_hash | |
892 session = self.getSession(client, session_hash) | |
893 factory = Socks5ClientFactory(client, self, session, peer_session_hash) | |
894 candidate.factory = factory | |
895 if delay is None: | |
896 d = defer.succeed(candidate.host) | |
897 else: | |
898 d = sat_defer.DelayedDeferred(delay, candidate.host) | |
899 d.addCallback(reactor.connectTCP, candidate.port, factory) | |
900 d.addCallback(self._addConnector, candidate) | |
901 return d | |
902 | |
903 def tryCandidates(self, client, candidates, session_hash, peer_session_hash, connection_cb=None, connection_eb=None): | |
904 defers_list = [] | |
905 | |
906 for candidate in candidates: | |
907 delay = CANDIDATE_DELAY * len(defers_list) | |
908 if candidate.type == XEP_0065.TYPE_PROXY: | |
909 delay += CANDIDATE_DELAY_PROXY | |
910 d = self.connectCandidate(client, candidate, session_hash, peer_session_hash, delay) | |
911 if connection_cb is not None: | |
912 d.addCallback(lambda dummy, candidate=candidate, client=client: connection_cb(client, candidate)) | |
913 if connection_eb is not None: | |
914 d.addErrback(connection_eb, client, candidate) | |
915 defers_list.append(d) | |
916 | |
917 return defers_list | |
918 | |
919 def getBestCandidate(self, client, candidates, session_hash, peer_session_hash=None): | |
920 """Get best candidate (according to priority) which can connect | |
921 | |
922 @param candidates(iterable[Candidate]): candidates to test | |
923 @param session_hash(unicode): hash of the session | |
924 hash is the same as hostname computed in XEP-0065 § 5.3.2 #1 | |
925 @param peer_session_hash(unicode, None): hash of the other peer | |
926 only useful for XEP-0260, must be None for XEP-0065 streamhost candidates | |
927 @return (D(None, Candidate)): best candidate or None if none can connect | |
928 """ | |
929 defer_candidates = None | |
930 | |
931 def connectionCb(client, candidate): | |
932 log.info(u"Connection of {} successful".format(unicode(candidate))) | |
933 for idx, other_candidate in enumerate(candidates): | |
934 try: | |
935 if other_candidate.priority < candidate.priority: | |
936 log.debug(u"Cancelling {}".format(other_candidate)) | |
937 defer_candidates[idx].cancel() | |
938 except AttributeError: | |
939 assert other_candidate is None | |
940 | |
941 def connectionEb(failure, client, candidate): | |
942 if failure.check(defer.CancelledError): | |
943 log.debug(u"Connection of {} has been cancelled".format(candidate)) | |
944 else: | |
945 log.info(u"Connection of {candidate} Failed: {error}".format( | |
946 candidate = candidate, | |
947 error = failure.value)) | |
948 candidates[candidates.index(candidate)] = None | |
949 | |
950 def allTested(self): | |
951 log.debug(u"All candidates have been tested") | |
952 good_candidates = [c for c in candidates if c] | |
953 return good_candidates[0] if good_candidates else None | |
954 | |
955 defer_candidates = self.tryCandidates(client, candidates, session_hash, peer_session_hash, connectionCb, connectionEb) | |
956 d_list = defer.DeferredList(defer_candidates) | |
957 d_list.addCallback(allTested) | |
958 return d_list | |
959 | |
960 def _timeOut(self, session_hash, client): | |
961 """Called when stream was not started quickly enough | |
962 | |
963 @param session_hash(str): hash as returned by getSessionHash | |
964 @param client: %(doc_client)s | |
965 """ | |
966 log.info(u"Socks5 Bytestream: TimeOut reached") | |
967 session = self.getSession(client, session_hash) | |
968 session[DEFER_KEY].errback(exceptions.TimeOutError) | |
969 | |
970 def killSession(self, failure_, session_hash, sid, client): | |
971 """Clean the current session | |
972 | |
973 @param session_hash(str): hash as returned by getSessionHash | |
974 @param sid(None, unicode): session id | |
975 or None if self.xep_0065_sid_session was not used | |
976 @param client: %(doc_client)s | |
977 @param failure_(None, failure.Failure): None if eveything was fine, a failure else | |
978 @return (None, failure.Failure): failure_ is returned | |
979 """ | |
980 log.debug(u'Cleaning session with hash {hash}{id}: {reason}'.format( | |
981 hash=session_hash, | |
982 reason='' if failure_ is None else failure_.value, | |
983 id='' if sid is None else u' (id: {})'.format(sid), | |
984 )) | |
985 | |
986 try: | |
987 assert self.hash_clients_map[session_hash] == client | |
988 del self.hash_clients_map[session_hash] | |
989 except KeyError: | |
990 pass | |
991 | |
992 if sid is not None: | |
993 try: | |
994 del client.xep_0065_sid_session[sid] | |
995 except KeyError: | |
996 log.warning(u"Session id {} is unknown".format(sid)) | |
997 | |
998 try: | |
999 session_data = client._s5b_sessions[session_hash] | |
1000 except KeyError: | |
1001 log.warning(u"There is no session with this hash") | |
1002 return | |
1003 else: | |
1004 del client._s5b_sessions[session_hash] | |
1005 | |
1006 try: | |
1007 session_data['timer'].cancel() | |
1008 except (internet_error.AlreadyCalled, internet_error.AlreadyCancelled): | |
1009 pass | |
1010 | |
1011 return failure_ | |
1012 | |
1013 def startStream(self, client, stream_object, to_jid, sid): | |
1014 """Launch the stream workflow | |
1015 | |
1016 @param streamProducer: stream_object to use | |
1017 @param to_jid: JID of the recipient | |
1018 @param sid: Stream session id | |
1019 @param successCb: method to call when stream successfuly finished | |
1020 @param failureCb: method to call when something goes wrong | |
1021 @return (D): Deferred fired when session is finished | |
1022 """ | |
1023 session_data = self._createSession(client, stream_object, to_jid, sid, True) | |
1024 | |
1025 session_data[client] = client | |
1026 | |
1027 def gotCandidates(candidates): | |
1028 session_data['candidates'] = candidates | |
1029 iq_elt = client.IQ() | |
1030 iq_elt["from"] = client.jid.full() | |
1031 iq_elt["to"] = to_jid.full() | |
1032 query_elt = iq_elt.addElement((NS_BS, 'query')) | |
1033 query_elt['mode'] = 'tcp' | |
1034 query_elt['sid'] = sid | |
1035 | |
1036 for candidate in candidates: | |
1037 streamhost = query_elt.addElement('streamhost') | |
1038 streamhost['host'] = candidate.host | |
1039 streamhost['port'] = str(candidate.port) | |
1040 streamhost['jid'] = candidate.jid.full() | |
1041 log.debug(u"Candidate proposed: {}".format(candidate)) | |
1042 | |
1043 d = iq_elt.send() | |
1044 args = [session_data, client] | |
1045 d.addCallbacks(self._IQNegotiationCb, self._IQNegotiationEb, args, None, args) | |
1046 | |
1047 self.getCandidates(client).addCallback(gotCandidates) | |
1048 return session_data[DEFER_KEY] | |
1049 | |
1050 def _IQNegotiationCb(self, iq_elt, session_data, client): | |
1051 """Called when the result of open iq is received | |
1052 | |
1053 @param session_data(dict): data of the session | |
1054 @param client: %(doc_client)s | |
1055 @param iq_elt(domish.Element): <iq> result | |
1056 """ | |
1057 try: | |
1058 query_elt = iq_elt.elements(NS_BS, 'query').next() | |
1059 streamhost_used_elt = query_elt.elements(NS_BS, 'streamhost-used').next() | |
1060 except StopIteration: | |
1061 log.warning(u"No streamhost found in stream query") | |
1062 # FIXME: must clean session | |
1063 return | |
1064 | |
1065 streamhost_jid = jid.JID(streamhost_used_elt['jid']) | |
1066 try: | |
1067 candidate = (c for c in session_data['candidates'] if c.jid == streamhost_jid).next() | |
1068 except StopIteration: | |
1069 log.warning(u"Candidate [{jid}] is unknown !".format(jid=streamhost_jid.full())) | |
1070 return | |
1071 else: | |
1072 log.info(u"Candidate choosed by target: {}".format(candidate)) | |
1073 | |
1074 if candidate.type == XEP_0065.TYPE_PROXY: | |
1075 log.info(u"A Socks5 proxy is used") | |
1076 d = self.connectCandidate(client, candidate, session_data['hash']) | |
1077 d.addCallback(lambda dummy: candidate.activate(session_data['id'], session_data['peer_jid'], client)) | |
1078 d.addErrback(self._activationEb) | |
1079 else: | |
1080 d = defer.succeed(None) | |
1081 | |
1082 d.addCallback(lambda dummy: candidate.startTransfer(session_data['hash'])) | |
1083 | |
1084 def _activationEb(self, failure): | |
1085 log.warning(u"Proxy activation error: {}".format(failure.value)) | |
1086 | |
1087 def _IQNegotiationEb(self, stanza_err, session_data, client): | |
1088 log.warning(u"Socks5 transfer failed: {}".format(stanza_err.value)) | |
1089 # FIXME: must clean session | |
1090 | |
1091 def createSession(self, *args, **kwargs): | |
1092 """like [_createSession] but return the session deferred instead of the whole session | |
1093 | |
1094 session deferred is fired when transfer is finished | |
1095 """ | |
1096 return self._createSession(*args, **kwargs)[DEFER_KEY] | |
1097 | |
1098 def _createSession(self, client, stream_object, to_jid, sid, requester=False): | |
1099 """Called when a bytestream is imminent | |
1100 | |
1101 @param stream_object(iface.IStreamProducer): File object where data will be written | |
1102 @param to_jid(jid.JId): jid of the other peer | |
1103 @param sid(unicode): session id | |
1104 @param initiator(bool): if True, this session is create by initiator | |
1105 @return (dict): session data | |
1106 """ | |
1107 if sid in client.xep_0065_sid_session: | |
1108 raise exceptions.ConflictError(u'A session with this id already exists !') | |
1109 if requester: | |
1110 session_hash = getSessionHash(client.jid, to_jid, sid) | |
1111 session_data = self._registerHash(client, session_hash, stream_object) | |
1112 else: | |
1113 session_hash = getSessionHash(to_jid, client.jid, sid) | |
1114 session_d = defer.Deferred() | |
1115 session_d.addBoth(self.killSession, session_hash, sid, client) | |
1116 session_data = client._s5b_sessions[session_hash] = { | |
1117 DEFER_KEY: session_d, | |
1118 TIMER_KEY: reactor.callLater(TIMEOUT, self._timeOut, session_hash, client), | |
1119 } | |
1120 client.xep_0065_sid_session[sid] = session_data | |
1121 session_data.update( | |
1122 {'id': sid, | |
1123 'peer_jid': to_jid, | |
1124 'stream_object': stream_object, | |
1125 'hash': session_hash, | |
1126 }) | |
1127 | |
1128 return session_data | |
1129 | |
1130 def getSession(self, client, session_hash): | |
1131 """Return session data | |
1132 | |
1133 @param session_hash(unicode): hash of the session | |
1134 hash is the same as hostname computed in XEP-0065 § 5.3.2 #1 | |
1135 @param client(None, SatXMPPClient): client of the peer | |
1136 None is used only if client is unknown (this is only the case | |
1137 for incoming request received by Socks5ServerFactory). None must | |
1138 only be used by Socks5ServerFactory. | |
1139 See comments below for details | |
1140 @return (dict): session data | |
1141 """ | |
1142 if client is None: | |
1143 try: | |
1144 client = self.hash_clients_map[session_hash] | |
1145 except KeyError as e: | |
1146 log.warning(u"The requested session doesn't exists !") | |
1147 raise e | |
1148 return client._s5b_sessions[session_hash] | |
1149 | |
1150 def registerHash(self, *args, **kwargs): | |
1151 """like [_registerHash] but return the session deferred instead of the whole session | |
1152 session deferred is fired when transfer is finished | |
1153 """ | |
1154 return self._registerHash(*args, **kwargs)[DEFER_KEY] | |
1155 | |
1156 def _registerHash(self, client, session_hash, stream_object): | |
1157 """Create a session_data associated to hash | |
1158 | |
1159 @param session_hash(str): hash of the session | |
1160 @param stream_object(iface.IStreamProducer, IConsumer, None): file-like object | |
1161 None if it will be filled later | |
1162 return (dict): session data | |
1163 """ | |
1164 assert session_hash not in client._s5b_sessions | |
1165 session_d = defer.Deferred() | |
1166 session_d.addBoth(self.killSession, session_hash, None, client) | |
1167 session_data = client._s5b_sessions[session_hash] = { | |
1168 DEFER_KEY: session_d, | |
1169 TIMER_KEY: reactor.callLater(TIMEOUT, self._timeOut, session_hash, client), | |
1170 } | |
1171 | |
1172 if stream_object is not None: | |
1173 session_data['stream_object'] = stream_object | |
1174 | |
1175 assert session_hash not in self.hash_clients_map | |
1176 self.hash_clients_map[session_hash] = client | |
1177 | |
1178 return session_data | |
1179 | |
1180 def associateStreamObject(self, client, session_hash, stream_object): | |
1181 """Associate a stream object with a session""" | |
1182 session_data = self.getSession(client, session_hash) | |
1183 assert 'stream_object' not in session_data | |
1184 session_data['stream_object'] = stream_object | |
1185 | |
1186 def streamQuery(self, iq_elt, client): | |
1187 log.debug(u"BS stream query") | |
1188 | |
1189 iq_elt.handled = True | |
1190 | |
1191 query_elt = iq_elt.elements(NS_BS, 'query').next() | |
1192 try: | |
1193 sid = query_elt['sid'] | |
1194 except KeyError: | |
1195 log.warning(u"Invalid bystreams request received") | |
1196 return client.sendError(iq_elt, "bad-request") | |
1197 | |
1198 streamhost_elts = list(query_elt.elements(NS_BS, 'streamhost')) | |
1199 if not streamhost_elts: | |
1200 return client.sendError(iq_elt, "bad-request") | |
1201 | |
1202 try: | |
1203 session_data = client.xep_0065_sid_session[sid] | |
1204 except KeyError: | |
1205 log.warning(u"Ignoring unexpected BS transfer: {}".format(sid)) | |
1206 return client.sendError(iq_elt, 'not-acceptable') | |
1207 | |
1208 peer_jid = session_data["peer_jid"] = jid.JID(iq_elt["from"]) | |
1209 | |
1210 candidates = [] | |
1211 nb_sh = len(streamhost_elts) | |
1212 for idx, sh_elt in enumerate(streamhost_elts): | |
1213 try: | |
1214 host, port, jid_ = sh_elt['host'], sh_elt['port'], jid.JID(sh_elt['jid']) | |
1215 except KeyError: | |
1216 log.warning(u"malformed streamhost element") | |
1217 return client.sendError(iq_elt, "bad-request") | |
1218 priority = nb_sh - idx | |
1219 if jid_.userhostJID() != peer_jid.userhostJID(): | |
1220 type_ = XEP_0065.TYPE_PROXY | |
1221 else: | |
1222 type_ = XEP_0065.TYPE_DIRECT | |
1223 candidates.append(Candidate(host, port, type_, priority, jid_)) | |
1224 | |
1225 for candidate in candidates: | |
1226 log.info(u"Candidate proposed: {}".format(candidate)) | |
1227 | |
1228 d = self.getBestCandidate(client, candidates, session_data['hash']) | |
1229 d.addCallback(self._ackStream, iq_elt, session_data, client) | |
1230 | |
1231 def _ackStream(self, candidate, iq_elt, session_data, client): | |
1232 if candidate is None: | |
1233 log.info("No streamhost candidate worked, we have to end negotiation") | |
1234 return client.sendError(iq_elt, 'item-not-found') | |
1235 log.info(u"We choose: {}".format(candidate)) | |
1236 result_elt = xmlstream.toResponse(iq_elt, 'result') | |
1237 query_elt = result_elt.addElement((NS_BS, 'query')) | |
1238 query_elt['sid'] = session_data['id'] | |
1239 streamhost_used_elt = query_elt.addElement('streamhost-used') | |
1240 streamhost_used_elt['jid'] = candidate.jid.full() | |
1241 client.send(result_elt) | |
1242 | |
1243 | |
1244 class XEP_0065_handler(XMPPHandler): | |
1245 implements(iwokkel.IDisco) | |
1246 | |
1247 def __init__(self, plugin_parent): | |
1248 self.plugin_parent = plugin_parent | |
1249 self.host = plugin_parent.host | |
1250 | |
1251 def connectionInitialized(self): | |
1252 self.xmlstream.addObserver(BS_REQUEST, self.plugin_parent.streamQuery, client=self.parent) | |
1253 | |
1254 def getDiscoInfo(self, requestor, target, nodeIdentifier=''): | |
1255 return [disco.DiscoFeature(NS_BS)] | |
1256 | |
1257 def getDiscoItems(self, requestor, target, nodeIdentifier=''): | |
1258 return [] |