comparison sat/plugins/plugin_xep_0065.py @ 2624:56f94936df1e

code style reformatting using black
author Goffi <goffi@goffi.org>
date Wed, 27 Jun 2018 20:14:46 +0200
parents 26edcf3a30eb
children 378188abe941
comparison
equal deleted inserted replaced
2623:49533de4540b 2624:56f94936df1e
1 #!/usr/bin/env python2 1 #!/usr/bin/env python2
2 #-*- coding: utf-8 -*- 2 # -*- coding: utf-8 -*-
3 3
4 # SAT plugin for managing xep-0065 4 # SAT plugin for managing xep-0065
5 5
6 # Copyright (C) 6 # Copyright (C)
7 # 2002, 2003, 2004 Dave Smith (dizzyd@jabber.org) 7 # 2002, 2003, 2004 Dave Smith (dizzyd@jabber.org)
54 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 54 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
55 # THE SOFTWARE. 55 # THE SOFTWARE.
56 56
57 from sat.core.i18n import _ 57 from sat.core.i18n import _
58 from sat.core.log import getLogger 58 from sat.core.log import getLogger
59
59 log = getLogger(__name__) 60 log = getLogger(__name__)
60 from sat.core.constants import Const as C 61 from sat.core.constants import Const as C
61 from sat.core import exceptions 62 from sat.core import exceptions
62 from sat.tools import sat_defer 63 from sat.tools import sat_defer
63 from twisted.internet import protocol 64 from twisted.internet import protocol
90 C.PI_PROTOCOLS: ["XEP-0065"], 91 C.PI_PROTOCOLS: ["XEP-0065"],
91 C.PI_DEPENDENCIES: ["IP"], 92 C.PI_DEPENDENCIES: ["IP"],
92 C.PI_RECOMMENDATIONS: ["NAT-PORT"], 93 C.PI_RECOMMENDATIONS: ["NAT-PORT"],
93 C.PI_MAIN: "XEP_0065", 94 C.PI_MAIN: "XEP_0065",
94 C.PI_HANDLER: "yes", 95 C.PI_HANDLER: "yes",
95 C.PI_DESCRIPTION: _("""Implementation of SOCKS5 Bytestreams""") 96 C.PI_DESCRIPTION: _("""Implementation of SOCKS5 Bytestreams"""),
96 } 97 }
97 98
98 IQ_SET = '/iq[@type="set"]' 99 IQ_SET = '/iq[@type="set"]'
99 NS_BS = 'http://jabber.org/protocol/bytestreams' 100 NS_BS = "http://jabber.org/protocol/bytestreams"
100 BS_REQUEST = IQ_SET + '/query[@xmlns="' + NS_BS + '"]' 101 BS_REQUEST = IQ_SET + '/query[@xmlns="' + NS_BS + '"]'
101 TIMER_KEY = 'timer' 102 TIMER_KEY = "timer"
102 DEFER_KEY = 'finished' # key of the deferred used to track session end 103 DEFER_KEY = "finished" # key of the deferred used to track session end
103 SERVER_STARTING_PORT = 0 # starting number for server port search (0 to ask automatic attribution) 104 SERVER_STARTING_PORT = (
105 0
106 ) # starting number for server port search (0 to ask automatic attribution)
104 107
105 # priorities are candidates local priorities, must be a int between 0 and 65535 108 # priorities are candidates local priorities, must be a int between 0 and 65535
106 PRIORITY_BEST_DIRECT = 10000 109 PRIORITY_BEST_DIRECT = 10000
107 PRIORITY_DIRECT = 5000 110 PRIORITY_DIRECT = 5000
108 PRIORITY_ASSISTED = 1000 111 PRIORITY_ASSISTED = 1000
109 PRIORITY_PROXY = 0.2 # proxy is the last option for s5b 112 PRIORITY_PROXY = 0.2 # proxy is the last option for s5b
110 CANDIDATE_DELAY = 0.2 # see XEP-0260 §4 113 CANDIDATE_DELAY = 0.2 # see XEP-0260 §4
111 CANDIDATE_DELAY_PROXY = 0.2 # additional time for proxy types (see XEP-0260 §4 note 3) 114 CANDIDATE_DELAY_PROXY = 0.2 # additional time for proxy types (see XEP-0260 §4 note 3)
112 115
113 TIMEOUT = 300 # maxium time between session creation and stream start 116 TIMEOUT = 300 # maxium time between session creation and stream start
114 117
115 # XXX: by default eveything is automatic 118 # XXX: by default eveything is automatic
116 # TODO: use these params to force use of specific proxy/port/IP 119 # TODO: use these params to force use of specific proxy/port/IP
117 # PARAMS = """ 120 # PARAMS = """
118 # <params> 121 # <params>
130 # </category> 133 # </category>
131 # </individual> 134 # </individual>
132 # </params> 135 # </params>
133 # """ 136 # """
134 137
135 (STATE_INITIAL, 138 (
136 STATE_AUTH, 139 STATE_INITIAL,
137 STATE_REQUEST, 140 STATE_AUTH,
138 STATE_READY, 141 STATE_REQUEST,
139 STATE_AUTH_USERPASS, 142 STATE_READY,
140 STATE_CLIENT_INITIAL, 143 STATE_AUTH_USERPASS,
141 STATE_CLIENT_AUTH, 144 STATE_CLIENT_INITIAL,
142 STATE_CLIENT_REQUEST, 145 STATE_CLIENT_AUTH,
146 STATE_CLIENT_REQUEST,
143 ) = xrange(8) 147 ) = xrange(8)
144 148
145 SOCKS5_VER = 0x05 149 SOCKS5_VER = 0x05
146 150
147 ADDR_IPV4 = 0x01 151 ADDR_IPV4 = 0x01
165 REPLY_TTL_EXPIRED = 0x06 169 REPLY_TTL_EXPIRED = 0x06
166 REPLY_CMD_NOT_SUPPORTED = 0x07 170 REPLY_CMD_NOT_SUPPORTED = 0x07
167 REPLY_ADDR_NOT_SUPPORTED = 0x08 171 REPLY_ADDR_NOT_SUPPORTED = 0x08
168 172
169 173
170 ProxyInfos = namedtuple("ProxyInfos", ['host', 'jid', 'port']) 174 ProxyInfos = namedtuple("ProxyInfos", ["host", "jid", "port"])
171 175
172 176
173 class Candidate(object): 177 class Candidate(object):
174 178 def __init__(
175 def __init__(self, host, port, type_, priority, jid_, id_=None, priority_local=False, factory=None): 179 self,
180 host,
181 port,
182 type_,
183 priority,
184 jid_,
185 id_=None,
186 priority_local=False,
187 factory=None,
188 ):
176 """ 189 """
177 @param host(unicode): host IP or domain 190 @param host(unicode): host IP or domain
178 @param port(int): port 191 @param port(int): port
179 @param type_(unicode): stream type (one of XEP_0065.TYPE_*) 192 @param type_(unicode): stream type (one of XEP_0065.TYPE_*)
180 @param priority(int): priority 193 @param priority(int): priority
182 @param id_(None, id_): Candidate ID, or None to generate 195 @param id_(None, id_): Candidate ID, or None to generate
183 @param priority_local(bool): if True, priority is used as local priority, 196 @param priority_local(bool): if True, priority is used as local priority,
184 else priority is used as global one (and local priority is set to 0) 197 else priority is used as global one (and local priority is set to 0)
185 """ 198 """
186 assert isinstance(jid_, jid.JID) 199 assert isinstance(jid_, jid.JID)
187 self.host, self.port, self.type, self.jid = ( 200 self.host, self.port, self.type, self.jid = (host, int(port), type_, jid_)
188 host, int(port), type_, jid_)
189 self.id = id_ if id_ is not None else unicode(uuid.uuid4()) 201 self.id = id_ if id_ is not None else unicode(uuid.uuid4())
190 if priority_local: 202 if priority_local:
191 self._local_priority = int(priority) 203 self._local_priority = int(priority)
192 self._priority = self.calculatePriority() 204 self._priority = self.calculatePriority()
193 else: 205 else:
202 """ 214 """
203 log.debug(u"Discarding {}".format(self)) 215 log.debug(u"Discarding {}".format(self))
204 try: 216 try:
205 self.factory.discard() 217 self.factory.discard()
206 except AttributeError: 218 except AttributeError:
207 pass # no discard for Socks5ServerFactory 219 pass # no discard for Socks5ServerFactory
208 220
209 @property 221 @property
210 def local_priority(self): 222 def local_priority(self):
211 return self._local_priority 223 return self._local_priority
212 224
216 228
217 def __str__(self): 229 def __str__(self):
218 # similar to __unicode__ but we don't show jid and we encode id 230 # similar to __unicode__ but we don't show jid and we encode id
219 return "Candidate ({0.priority}): host={0.host} port={0.port} type={0.type}{id}".format( 231 return "Candidate ({0.priority}): host={0.host} port={0.port} type={0.type}{id}".format(
220 self, 232 self,
221 id=u" id={}".format(self.id if self.id is not None else u'').encode('utf-8', 'ignore'), 233 id=u" id={}".format(self.id if self.id is not None else u"").encode(
222 ) 234 "utf-8", "ignore"
235 ),
236 )
223 237
224 def __unicode__(self): 238 def __unicode__(self):
225 return u"Candidate ({0.priority}): host={0.host} port={0.port} jid={0.jid} type={0.type}{id}".format( 239 return u"Candidate ({0.priority}): host={0.host} port={0.port} jid={0.jid} type={0.type}{id}".format(
226 self, 240 self, id=u" id={}".format(self.id if self.id is not None else u"")
227 id=u" id={}".format(self.id if self.id is not None else u''), 241 )
228 )
229 242
230 def __eq__(self, other): 243 def __eq__(self, other):
231 # self.id is is not used in __eq__ as the same candidate can have 244 # self.id is is not used in __eq__ as the same candidate can have
232 # different ids if proposed by initiator or responder 245 # different ids if proposed by initiator or responder
233 try: 246 try:
234 return (self.host == other.host and 247 return (
235 self.port == other.port and 248 self.host == other.host
236 self.jid == other.jid) 249 and self.port == other.port
250 and self.jid == other.jid
251 )
237 except (AttributeError, TypeError): 252 except (AttributeError, TypeError):
238 return False 253 return False
239 254
240 def __ne__(self, other): 255 def __ne__(self, other):
241 return not self.__eq__(other) 256 return not self.__eq__(other)
254 multiplier = 110 269 multiplier = 110
255 elif self.type == XEP_0065.TYPE_PROXY: 270 elif self.type == XEP_0065.TYPE_PROXY:
256 multiplier = 10 271 multiplier = 10
257 else: 272 else:
258 raise exceptions.InternalError(u"Unknown {} type !".format(self.type)) 273 raise exceptions.InternalError(u"Unknown {} type !".format(self.type))
259 return 2**16 * multiplier + self._local_priority 274 return 2 ** 16 * multiplier + self._local_priority
260 275
261 def activate(self, sid, peer_jid, client): 276 def activate(self, sid, peer_jid, client):
262 """Activate the proxy candidate 277 """Activate the proxy candidate
263 278
264 Send activation request as explained in XEP-0065 § 6.3.5 279 Send activation request as explained in XEP-0065 § 6.3.5
267 @param peer_jid(jid.JID): jid of the other peer 282 @param peer_jid(jid.JID): jid of the other peer
268 @return (D(domish.Element)): IQ result (or error) 283 @return (D(domish.Element)): IQ result (or error)
269 """ 284 """
270 assert self.type == XEP_0065.TYPE_PROXY 285 assert self.type == XEP_0065.TYPE_PROXY
271 iq_elt = client.IQ() 286 iq_elt = client.IQ()
272 iq_elt['to'] = self.jid.full() 287 iq_elt["to"] = self.jid.full()
273 query_elt = iq_elt.addElement((NS_BS, 'query')) 288 query_elt = iq_elt.addElement((NS_BS, "query"))
274 query_elt['sid'] = sid 289 query_elt["sid"] = sid
275 query_elt.addElement('activate', content=peer_jid.full()) 290 query_elt.addElement("activate", content=peer_jid.full())
276 return iq_elt.send() 291 return iq_elt.send()
277 292
278 def startTransfer(self, session_hash=None): 293 def startTransfer(self, session_hash=None):
279 if self.type == XEP_0065.TYPE_PROXY: 294 if self.type == XEP_0065.TYPE_PROXY:
280 chunk_size = 4096 # Prosody's proxy reject bigger chunks by default 295 chunk_size = 4096 # Prosody's proxy reject bigger chunks by default
281 else: 296 else:
282 chunk_size = None 297 chunk_size = None
283 self.factory.startTransfer(session_hash, chunk_size=chunk_size) 298 self.factory.startTransfer(session_hash, chunk_size=chunk_size)
284 299
285 300
289 @param requester_jid(jid.JID): jid of the requester (the one which activate the proxy) 304 @param requester_jid(jid.JID): jid of the requester (the one which activate the proxy)
290 @param target_jid(jid.JID): jid of the target 305 @param target_jid(jid.JID): jid of the target
291 @param sid(unicode): session id 306 @param sid(unicode): session id
292 @return (str): hash 307 @return (str): hash
293 """ 308 """
294 return hashlib.sha1((sid + requester_jid.full() + target_jid.full()).encode('utf-8')).hexdigest() 309 return hashlib.sha1(
310 (sid + requester_jid.full() + target_jid.full()).encode("utf-8")
311 ).hexdigest()
295 312
296 313
297 class SOCKSv5(protocol.Protocol): 314 class SOCKSv5(protocol.Protocol):
298 CHUNK_SIZE = 2**16 315 CHUNK_SIZE = 2 ** 16
299 316
300 def __init__(self, session_hash=None): 317 def __init__(self, session_hash=None):
301 """ 318 """
302 @param session_hash(str): hash of the session 319 @param session_hash(str): hash of the session
303 must only be used in client mode 320 must only be used in client mode
304 """ 321 """
305 self.connection = defer.Deferred() # called when connection/auth is done 322 self.connection = defer.Deferred() # called when connection/auth is done
306 if session_hash is not None: 323 if session_hash is not None:
307 self.server_mode = False 324 self.server_mode = False
308 self._session_hash = session_hash 325 self._session_hash = session_hash
309 self.state = STATE_CLIENT_INITIAL 326 self.state = STATE_CLIENT_INITIAL
310 else: 327 else:
316 self.enabledCommands = [CMD_CONNECT] 333 self.enabledCommands = [CMD_CONNECT]
317 self.peersock = None 334 self.peersock = None
318 self.addressType = 0 335 self.addressType = 0
319 self.requestType = 0 336 self.requestType = 0
320 self._stream_object = None 337 self._stream_object = None
321 self.active = False # set to True when protocol is actually used for transfer 338 self.active = False # set to True when protocol is actually used for transfer
322 # used by factories to know when the finished Deferred can be triggered 339 # used by factories to know when the finished Deferred can be triggered
323 340
324 @property 341 @property
325 def stream_object(self): 342 def stream_object(self):
326 if self._stream_object is None: 343 if self._stream_object is None:
327 self._stream_object = self.getSession()['stream_object'] 344 self._stream_object = self.getSession()["stream_object"]
328 if self.server_mode: 345 if self.server_mode:
329 self._stream_object.registerProducer(self.transport, True) 346 self._stream_object.registerProducer(self.transport, True)
330 return self._stream_object 347 return self._stream_object
331 348
332 def getSession(self): 349 def getSession(self):
340 return self.factory.getSession() 357 return self.factory.getSession()
341 358
342 def _startNegotiation(self): 359 def _startNegotiation(self):
343 log.debug("starting negotiation (client mode)") 360 log.debug("starting negotiation (client mode)")
344 self.state = STATE_CLIENT_AUTH 361 self.state = STATE_CLIENT_AUTH
345 self.transport.write(struct.pack('!3B', SOCKS5_VER, 1, AUTHMECH_ANON)) 362 self.transport.write(struct.pack("!3B", SOCKS5_VER, 1, AUTHMECH_ANON))
346 363
347 def _parseNegotiation(self): 364 def _parseNegotiation(self):
348 try: 365 try:
349 # Parse out data 366 # Parse out data
350 ver, nmethod = struct.unpack('!BB', self.buf[:2]) 367 ver, nmethod = struct.unpack("!BB", self.buf[:2])
351 methods = struct.unpack('%dB' % nmethod, self.buf[2:nmethod + 2]) 368 methods = struct.unpack("%dB" % nmethod, self.buf[2 : nmethod + 2])
352 369
353 # Ensure version is correct 370 # Ensure version is correct
354 if ver != 5: 371 if ver != 5:
355 self.transport.write(struct.pack('!BB', SOCKS5_VER, AUTHMECH_INVALID)) 372 self.transport.write(struct.pack("!BB", SOCKS5_VER, AUTHMECH_INVALID))
356 self.transport.loseConnection() 373 self.transport.loseConnection()
357 return 374 return
358 375
359 # Trim off front of the buffer 376 # Trim off front of the buffer
360 self.buf = self.buf[nmethod + 2:] 377 self.buf = self.buf[nmethod + 2 :]
361 378
362 # Check for supported auth mechs 379 # Check for supported auth mechs
363 for m in self.supportedAuthMechs: 380 for m in self.supportedAuthMechs:
364 if m in methods: 381 if m in methods:
365 # Update internal state, according to selected method 382 # Update internal state, according to selected method
366 if m == AUTHMECH_ANON: 383 if m == AUTHMECH_ANON:
367 self.state = STATE_REQUEST 384 self.state = STATE_REQUEST
368 elif m == AUTHMECH_USERPASS: 385 elif m == AUTHMECH_USERPASS:
369 self.state = STATE_AUTH_USERPASS 386 self.state = STATE_AUTH_USERPASS
370 # Complete negotiation w/ this method 387 # Complete negotiation w/ this method
371 self.transport.write(struct.pack('!BB', SOCKS5_VER, m)) 388 self.transport.write(struct.pack("!BB", SOCKS5_VER, m))
372 return 389 return
373 390
374 # No supported mechs found, notify client and close the connection 391 # No supported mechs found, notify client and close the connection
375 log.warning(u"Unsupported authentication mechanism") 392 log.warning(u"Unsupported authentication mechanism")
376 self.transport.write(struct.pack('!BB', SOCKS5_VER, AUTHMECH_INVALID)) 393 self.transport.write(struct.pack("!BB", SOCKS5_VER, AUTHMECH_INVALID))
377 self.transport.loseConnection() 394 self.transport.loseConnection()
378 except struct.error: 395 except struct.error:
379 pass 396 pass
380 397
381 def _parseUserPass(self): 398 def _parseUserPass(self):
382 try: 399 try:
383 # Parse out data 400 # Parse out data
384 ver, ulen = struct.unpack('BB', self.buf[:2]) 401 ver, ulen = struct.unpack("BB", self.buf[:2])
385 uname, = struct.unpack('%ds' % ulen, self.buf[2:ulen + 2]) 402 uname, = struct.unpack("%ds" % ulen, self.buf[2 : ulen + 2])
386 plen, = struct.unpack('B', self.buf[ulen + 2]) 403 plen, = struct.unpack("B", self.buf[ulen + 2])
387 password, = struct.unpack('%ds' % plen, self.buf[ulen + 3:ulen + 3 + plen]) 404 password, = struct.unpack("%ds" % plen, self.buf[ulen + 3 : ulen + 3 + plen])
388 # Trim off fron of the buffer 405 # Trim off fron of the buffer
389 self.buf = self.buf[3 + ulen + plen:] 406 self.buf = self.buf[3 + ulen + plen :]
390 # Fire event to authenticate user 407 # Fire event to authenticate user
391 if self.authenticateUserPass(uname, password): 408 if self.authenticateUserPass(uname, password):
392 # Signal success 409 # Signal success
393 self.state = STATE_REQUEST 410 self.state = STATE_REQUEST
394 self.transport.write(struct.pack('!BB', SOCKS5_VER, 0x00)) 411 self.transport.write(struct.pack("!BB", SOCKS5_VER, 0x00))
395 else: 412 else:
396 # Signal failure 413 # Signal failure
397 self.transport.write(struct.pack('!BB', SOCKS5_VER, 0x01)) 414 self.transport.write(struct.pack("!BB", SOCKS5_VER, 0x01))
398 self.transport.loseConnection() 415 self.transport.loseConnection()
399 except struct.error: 416 except struct.error:
400 pass 417 pass
401 418
402 def sendErrorReply(self, errorcode): 419 def sendErrorReply(self, errorcode):
403 # Any other address types are not supported 420 # Any other address types are not supported
404 result = struct.pack('!BBBBIH', SOCKS5_VER, errorcode, 0, 1, 0, 0) 421 result = struct.pack("!BBBBIH", SOCKS5_VER, errorcode, 0, 1, 0, 0)
405 self.transport.write(result) 422 self.transport.write(result)
406 self.transport.loseConnection() 423 self.transport.loseConnection()
407 424
408 def _parseRequest(self): 425 def _parseRequest(self):
409 try: 426 try:
410 # Parse out data and trim buffer accordingly 427 # Parse out data and trim buffer accordingly
411 ver, cmd, rsvd, self.addressType = struct.unpack('!BBBB', self.buf[:4]) 428 ver, cmd, rsvd, self.addressType = struct.unpack("!BBBB", self.buf[:4])
412 429
413 # Ensure we actually support the requested address type 430 # Ensure we actually support the requested address type
414 if self.addressType not in self.supportedAddrs: 431 if self.addressType not in self.supportedAddrs:
415 self.sendErrorReply(REPLY_ADDR_NOT_SUPPORTED) 432 self.sendErrorReply(REPLY_ADDR_NOT_SUPPORTED)
416 return 433 return
417 434
418 # Deal with addresses 435 # Deal with addresses
419 if self.addressType == ADDR_IPV4: 436 if self.addressType == ADDR_IPV4:
420 addr, port = struct.unpack('!IH', self.buf[4:10]) 437 addr, port = struct.unpack("!IH", self.buf[4:10])
421 self.buf = self.buf[10:] 438 self.buf = self.buf[10:]
422 elif self.addressType == ADDR_DOMAINNAME: 439 elif self.addressType == ADDR_DOMAINNAME:
423 nlen = ord(self.buf[4]) 440 nlen = ord(self.buf[4])
424 addr, port = struct.unpack('!%dsH' % nlen, self.buf[5:]) 441 addr, port = struct.unpack("!%dsH" % nlen, self.buf[5:])
425 self.buf = self.buf[7 + len(addr):] 442 self.buf = self.buf[7 + len(addr) :]
426 else: 443 else:
427 # Any other address types are not supported 444 # Any other address types are not supported
428 self.sendErrorReply(REPLY_ADDR_NOT_SUPPORTED) 445 self.sendErrorReply(REPLY_ADDR_NOT_SUPPORTED)
429 return 446 return
430 447
447 # The buffer is probably not complete, we need to wait more 464 # The buffer is probably not complete, we need to wait more
448 return None 465 return None
449 466
450 def _makeRequest(self): 467 def _makeRequest(self):
451 hash_ = self._session_hash 468 hash_ = self._session_hash
452 request = struct.pack('!5B%dsH' % len(hash_), SOCKS5_VER, CMD_CONNECT, 0, ADDR_DOMAINNAME, len(hash_), hash_, 0) 469 request = struct.pack(
470 "!5B%dsH" % len(hash_),
471 SOCKS5_VER,
472 CMD_CONNECT,
473 0,
474 ADDR_DOMAINNAME,
475 len(hash_),
476 hash_,
477 0,
478 )
453 self.transport.write(request) 479 self.transport.write(request)
454 self.state = STATE_CLIENT_REQUEST 480 self.state = STATE_CLIENT_REQUEST
455 481
456 def _parseRequestReply(self): 482 def _parseRequestReply(self):
457 try: 483 try:
458 ver, rep, rsvd, self.addressType = struct.unpack('!BBBB', self.buf[:4]) 484 ver, rep, rsvd, self.addressType = struct.unpack("!BBBB", self.buf[:4])
459 # Ensure we actually support the requested address type 485 # Ensure we actually support the requested address type
460 if self.addressType not in self.supportedAddrs: 486 if self.addressType not in self.supportedAddrs:
461 self.sendErrorReply(REPLY_ADDR_NOT_SUPPORTED) 487 self.sendErrorReply(REPLY_ADDR_NOT_SUPPORTED)
462 return 488 return
463 489
464 # Deal with addresses 490 # Deal with addresses
465 if self.addressType == ADDR_IPV4: 491 if self.addressType == ADDR_IPV4:
466 addr, port = struct.unpack('!IH', self.buf[4:10]) 492 addr, port = struct.unpack("!IH", self.buf[4:10])
467 self.buf = self.buf[10:] 493 self.buf = self.buf[10:]
468 elif self.addressType == ADDR_DOMAINNAME: 494 elif self.addressType == ADDR_DOMAINNAME:
469 nlen = ord(self.buf[4]) 495 nlen = ord(self.buf[4])
470 addr, port = struct.unpack('!%dsH' % nlen, self.buf[5:]) 496 addr, port = struct.unpack("!%dsH" % nlen, self.buf[5:])
471 self.buf = self.buf[7 + len(addr):] 497 self.buf = self.buf[7 + len(addr) :]
472 else: 498 else:
473 # Any other address types are not supported 499 # Any other address types are not supported
474 self.sendErrorReply(REPLY_ADDR_NOT_SUPPORTED) 500 self.sendErrorReply(REPLY_ADDR_NOT_SUPPORTED)
475 return 501 return
476 502
485 except struct.error: 511 except struct.error:
486 # The buffer is probably not complete, we need to wait more 512 # The buffer is probably not complete, we need to wait more
487 return None 513 return None
488 514
489 def connectionMade(self): 515 def connectionMade(self):
490 log.debug(u"Socks5 connectionMade (mode = {})".format("server" if self.state == STATE_INITIAL else "client")) 516 log.debug(
517 u"Socks5 connectionMade (mode = {})".format(
518 "server" if self.state == STATE_INITIAL else "client"
519 )
520 )
491 if self.state == STATE_CLIENT_INITIAL: 521 if self.state == STATE_CLIENT_INITIAL:
492 self._startNegotiation() 522 self._startNegotiation()
493 523
494 def connectRequested(self, addr, port): 524 def connectRequested(self, addr, port):
495 # Check that this session is expected 525 # Check that this session is expected
496 if not self.factory.addToSession(addr, self): 526 if not self.factory.addToSession(addr, self):
497 self.sendErrorReply(REPLY_CONN_REFUSED) 527 self.sendErrorReply(REPLY_CONN_REFUSED)
498 log.warning(u"Unexpected connection request received from {host}" 528 log.warning(
499 .format(host=self.transport.getPeer().host)) 529 u"Unexpected connection request received from {host}".format(
530 host=self.transport.getPeer().host
531 )
532 )
500 return 533 return
501 self._session_hash = addr 534 self._session_hash = addr
502 self.connectCompleted(addr, 0) 535 self.connectCompleted(addr, 0)
503 536
504 def startTransfer(self, chunk_size): 537 def startTransfer(self, chunk_size):
517 log.info(_("File transfer completed, closing connection")) 550 log.info(_("File transfer completed, closing connection"))
518 self.transport.loseConnection() 551 self.transport.loseConnection()
519 552
520 def connectCompleted(self, remotehost, remoteport): 553 def connectCompleted(self, remotehost, remoteport):
521 if self.addressType == ADDR_IPV4: 554 if self.addressType == ADDR_IPV4:
522 result = struct.pack('!BBBBIH', SOCKS5_VER, REPLY_SUCCESS, 0, 1, remotehost, remoteport) 555 result = struct.pack(
556 "!BBBBIH", SOCKS5_VER, REPLY_SUCCESS, 0, 1, remotehost, remoteport
557 )
523 elif self.addressType == ADDR_DOMAINNAME: 558 elif self.addressType == ADDR_DOMAINNAME:
524 result = struct.pack('!BBBBB%dsH' % len(remotehost), SOCKS5_VER, REPLY_SUCCESS, 0, 559 result = struct.pack(
525 ADDR_DOMAINNAME, len(remotehost), remotehost, remoteport) 560 "!BBBBB%dsH" % len(remotehost),
561 SOCKS5_VER,
562 REPLY_SUCCESS,
563 0,
564 ADDR_DOMAINNAME,
565 len(remotehost),
566 remotehost,
567 remoteport,
568 )
526 self.transport.write(result) 569 self.transport.write(result)
527 self.state = STATE_READY 570 self.state = STATE_READY
528 571
529 def bindRequested(self, addr, port): 572 def bindRequested(self, addr, port):
530 pass 573 pass
551 if self.state == STATE_REQUEST: 594 if self.state == STATE_REQUEST:
552 self._parseRequest() 595 self._parseRequest()
553 if self.state == STATE_CLIENT_REQUEST: 596 if self.state == STATE_CLIENT_REQUEST:
554 self._parseRequestReply() 597 self._parseRequestReply()
555 if self.state == STATE_CLIENT_AUTH: 598 if self.state == STATE_CLIENT_AUTH:
556 ver, method = struct.unpack('!BB', buf) 599 ver, method = struct.unpack("!BB", buf)
557 self.buf = self.buf[2:] 600 self.buf = self.buf[2:]
558 if ver != SOCKS5_VER or method != AUTHMECH_ANON: 601 if ver != SOCKS5_VER or method != AUTHMECH_ANON:
559 self.transport.loseConnection() 602 self.transport.loseConnection()
560 else: 603 else:
561 self._makeRequest() 604 self._makeRequest()
562 605
563 def connectionLost(self, reason): 606 def connectionLost(self, reason):
564 log.debug(u"Socks5 connection lost: {}".format(reason.value)) 607 log.debug(u"Socks5 connection lost: {}".format(reason.value))
565 if self.state != STATE_READY: 608 if self.state != STATE_READY:
566 self.connection.errback(reason) 609 self.connection.errback(reason)
567 if self.server_mode : 610 if self.server_mode:
568 self.factory.removeFromSession(self._session_hash, self, reason) 611 self.factory.removeFromSession(self._session_hash, self, reason)
569 612
570 613
571 class Socks5ServerFactory(protocol.ServerFactory): 614 class Socks5ServerFactory(protocol.ServerFactory):
572 protocol = SOCKSv5 615 protocol = SOCKSv5
581 return self.parent.getSession(None, session_hash) 624 return self.parent.getSession(None, session_hash)
582 625
583 def startTransfer(self, session_hash, chunk_size=None): 626 def startTransfer(self, session_hash, chunk_size=None):
584 session = self.getSession(session_hash) 627 session = self.getSession(session_hash)
585 try: 628 try:
586 protocol = session['protocols'][0] 629 protocol = session["protocols"][0]
587 except (KeyError, IndexError): 630 except (KeyError, IndexError):
588 log.error(u"Can't start file transfer, can't find protocol") 631 log.error(u"Can't start file transfer, can't find protocol")
589 else: 632 else:
590 session[TIMER_KEY].cancel() 633 session[TIMER_KEY].cancel()
591 protocol.startTransfer(chunk_size) 634 protocol.startTransfer(chunk_size)
601 try: 644 try:
602 session_data = self.getSession(session_hash) 645 session_data = self.getSession(session_hash)
603 except KeyError: 646 except KeyError:
604 return False 647 return False
605 else: 648 else:
606 session_data.setdefault('protocols', []).append(protocol) 649 session_data.setdefault("protocols", []).append(protocol)
607 return True 650 return True
608 651
609 def removeFromSession(self, session_hash, protocol, reason): 652 def removeFromSession(self, session_hash, protocol, reason):
610 """Remove a protocol from session_data 653 """Remove a protocol from session_data
611 654
614 @param session_hash(str): hash of the session 657 @param session_hash(str): hash of the session
615 @param protocol(SOCKSv5): protocol instance 658 @param protocol(SOCKSv5): protocol instance
616 @param reason(failure.Failure): reason of the removal 659 @param reason(failure.Failure): reason of the removal
617 """ 660 """
618 try: 661 try:
619 protocols = self.getSession(session_hash)['protocols'] 662 protocols = self.getSession(session_hash)["protocols"]
620 protocols.remove(protocol) 663 protocols.remove(protocol)
621 except (KeyError, ValueError): 664 except (KeyError, ValueError):
622 log.error(u"Protocol not found in session while it should be there") 665 log.error(u"Protocol not found in session while it should be there")
623 else: 666 else:
624 if protocol.active: 667 if protocol.active:
684 return p 727 return p
685 728
686 729
687 class XEP_0065(object): 730 class XEP_0065(object):
688 NAMESPACE = NS_BS 731 NAMESPACE = NS_BS
689 TYPE_DIRECT = 'direct' 732 TYPE_DIRECT = "direct"
690 TYPE_ASSISTED = 'assisted' 733 TYPE_ASSISTED = "assisted"
691 TYPE_TUNEL = 'tunel' 734 TYPE_TUNEL = "tunel"
692 TYPE_PROXY = 'proxy' 735 TYPE_PROXY = "proxy"
693 Candidate = Candidate 736 Candidate = Candidate
694 737
695 def __init__(self, host): 738 def __init__(self, host):
696 log.info(_("Plugin XEP_0065 initialization")) 739 log.info(_("Plugin XEP_0065 initialization"))
697 self.host = host 740 self.host = host
698 741
699 # session data 742 # session data
700 self.hash_clients_map = {} # key: hash of the transfer session, value: session data 743 self.hash_clients_map = {} # key: hash of the transfer session, value: session data
701 self._cache_proxies = {} # key: server jid, value: proxy data 744 self._cache_proxies = {} # key: server jid, value: proxy data
702 745
703 # misc data 746 # misc data
704 self._server_factory = None 747 self._server_factory = None
705 self._external_port = None 748 self._external_port = None
706 749
707 # plugins shortcuts 750 # plugins shortcuts
708 self._ip = self.host.plugins['IP'] 751 self._ip = self.host.plugins["IP"]
709 try: 752 try:
710 self._np = self.host.plugins['NAT-PORT'] 753 self._np = self.host.plugins["NAT-PORT"]
711 except KeyError: 754 except KeyError:
712 log.debug(u"NAT Port plugin not available") 755 log.debug(u"NAT Port plugin not available")
713 self._np = None 756 self._np = None
714 757
715 # parameters 758 # parameters
737 self._server_factory = Socks5ServerFactory(self) 780 self._server_factory = Socks5ServerFactory(self)
738 for port in xrange(SERVER_STARTING_PORT, 65356): 781 for port in xrange(SERVER_STARTING_PORT, 65356):
739 try: 782 try:
740 listening_port = reactor.listenTCP(port, self._server_factory) 783 listening_port = reactor.listenTCP(port, self._server_factory)
741 except internet_error.CannotListenError as e: 784 except internet_error.CannotListenError as e:
742 log.debug(u"Cannot listen on port {port}: {err_msg}{err_num}".format( 785 log.debug(
743 port=port, 786 u"Cannot listen on port {port}: {err_msg}{err_num}".format(
744 err_msg=e.socketError.strerror, 787 port=port,
745 err_num=u' (error code: {})'.format(e.socketError.errno), 788 err_msg=e.socketError.strerror,
746 )) 789 err_num=u" (error code: {})".format(e.socketError.errno),
790 )
791 )
747 else: 792 else:
748 self._server_factory_port = listening_port.getHost().port 793 self._server_factory_port = listening_port.getHost().port
749 break 794 break
750 795
751 log.info(_("Socks5 Stream server launched on port {}").format(self._server_factory_port)) 796 log.info(
797 _("Socks5 Stream server launched on port {}").format(
798 self._server_factory_port
799 )
800 )
752 return self._server_factory 801 return self._server_factory
753 802
754 @defer.inlineCallbacks 803 @defer.inlineCallbacks
755 def getProxy(self, client): 804 def getProxy(self, client):
756 """Return the proxy available for this profile 805 """Return the proxy available for this profile
757 806
758 cache is used between clients using the same server 807 cache is used between clients using the same server
759 @return ((D)(ProxyInfos, None)): Found proxy infos, 808 @return ((D)(ProxyInfos, None)): Found proxy infos,
760 or None if not acceptable proxy is found 809 or None if not acceptable proxy is found
761 """ 810 """
811
762 def notFound(server): 812 def notFound(server):
763 log.info(u"No proxy found on this server") 813 log.info(u"No proxy found on this server")
764 self._cache_proxies[server] = None 814 self._cache_proxies[server] = None
765 defer.returnValue(None) 815 defer.returnValue(None)
816
766 server = client.jid.host 817 server = client.jid.host
767 try: 818 try:
768 defer.returnValue(self._cache_proxies[server]) 819 defer.returnValue(self._cache_proxies[server])
769 except KeyError: 820 except KeyError:
770 pass 821 pass
771 try: 822 try:
772 proxy = (yield self.host.findServiceEntities(client, 'proxy', 'bytestreams')).pop() 823 proxy = (
824 yield self.host.findServiceEntities(client, "proxy", "bytestreams")
825 ).pop()
773 except (defer.CancelledError, StopIteration, KeyError): 826 except (defer.CancelledError, StopIteration, KeyError):
774 notFound(server) 827 notFound(server)
775 iq_elt = client.IQ('get') 828 iq_elt = client.IQ("get")
776 iq_elt['to'] = proxy.full() 829 iq_elt["to"] = proxy.full()
777 iq_elt.addElement((NS_BS, 'query')) 830 iq_elt.addElement((NS_BS, "query"))
778 831
779 try: 832 try:
780 result_elt = yield iq_elt.send() 833 result_elt = yield iq_elt.send()
781 except jabber_error.StanzaError as failure: 834 except jabber_error.StanzaError as failure:
782 log.warning(u"Error while requesting proxy info on {jid}: {error}" 835 log.warning(
783 .format(proxy.full(), failure)) 836 u"Error while requesting proxy info on {jid}: {error}".format(
837 proxy.full(), failure
838 )
839 )
784 notFound(server) 840 notFound(server)
785 841
786 try: 842 try:
787 query_elt = result_elt.elements(NS_BS, 'query').next() 843 query_elt = result_elt.elements(NS_BS, "query").next()
788 streamhost_elt = query_elt.elements(NS_BS, 'streamhost').next() 844 streamhost_elt = query_elt.elements(NS_BS, "streamhost").next()
789 host = streamhost_elt['host'] 845 host = streamhost_elt["host"]
790 jid_ = streamhost_elt['jid'] 846 jid_ = streamhost_elt["jid"]
791 port = streamhost_elt['port'] 847 port = streamhost_elt["port"]
792 if not all((host, jid, port)): 848 if not all((host, jid, port)):
793 raise KeyError 849 raise KeyError
794 jid_ = jid.JID(jid_) 850 jid_ = jid.JID(jid_)
795 except (StopIteration, KeyError, RuntimeError, jid.InvalidFormat, AttributeError): 851 except (StopIteration, KeyError, RuntimeError, jid.InvalidFormat, AttributeError):
796 log.warning(u"Invalid proxy data received from {}".format(proxy.full())) 852 log.warning(u"Invalid proxy data received from {}".format(proxy.full()))
816 if external_ip != local_ips[0]: 872 if external_ip != local_ips[0]:
817 log.info(u"We are probably behind a NAT") 873 log.info(u"We are probably behind a NAT")
818 if self._np is None: 874 if self._np is None:
819 log.warning(u"NAT port plugin not available, we can't map port") 875 log.warning(u"NAT port plugin not available, we can't map port")
820 else: 876 else:
821 ext_port = yield self._np.mapPort(local_port, desc=u"SaT socks5 stream") 877 ext_port = yield self._np.mapPort(
878 local_port, desc=u"SaT socks5 stream"
879 )
822 if ext_port is None: 880 if ext_port is None:
823 log.warning(u"Can't map NAT port") 881 log.warning(u"Can't map NAT port")
824 else: 882 else:
825 self._external_port = ext_port 883 self._external_port = ext_port
826 884
841 899
842 # first the direct ones 900 # first the direct ones
843 901
844 # the preferred direct connection 902 # the preferred direct connection
845 ip = local_ips.pop(0) 903 ip = local_ips.pop(0)
846 candidates.append(Candidate(ip, local_port, XEP_0065.TYPE_DIRECT, PRIORITY_BEST_DIRECT, client.jid, priority_local=True, factory=server_factory)) 904 candidates.append(
905 Candidate(
906 ip,
907 local_port,
908 XEP_0065.TYPE_DIRECT,
909 PRIORITY_BEST_DIRECT,
910 client.jid,
911 priority_local=True,
912 factory=server_factory,
913 )
914 )
847 for ip in local_ips: 915 for ip in local_ips:
848 candidates.append(Candidate(ip, local_port, XEP_0065.TYPE_DIRECT, PRIORITY_DIRECT, client.jid, priority_local=True, factory=server_factory)) 916 candidates.append(
917 Candidate(
918 ip,
919 local_port,
920 XEP_0065.TYPE_DIRECT,
921 PRIORITY_DIRECT,
922 client.jid,
923 priority_local=True,
924 factory=server_factory,
925 )
926 )
849 927
850 # then the assisted one 928 # then the assisted one
851 if ext_port is not None: 929 if ext_port is not None:
852 candidates.append(Candidate(external_ip, ext_port, XEP_0065.TYPE_ASSISTED, PRIORITY_ASSISTED, client.jid, priority_local=True, factory=server_factory)) 930 candidates.append(
931 Candidate(
932 external_ip,
933 ext_port,
934 XEP_0065.TYPE_ASSISTED,
935 PRIORITY_ASSISTED,
936 client.jid,
937 priority_local=True,
938 factory=server_factory,
939 )
940 )
853 941
854 # finally the proxy 942 # finally the proxy
855 if proxy: 943 if proxy:
856 candidates.append(Candidate(proxy.host, proxy.port, XEP_0065.TYPE_PROXY, PRIORITY_PROXY, proxy.jid, priority_local=True)) 944 candidates.append(
945 Candidate(
946 proxy.host,
947 proxy.port,
948 XEP_0065.TYPE_PROXY,
949 PRIORITY_PROXY,
950 proxy.jid,
951 priority_local=True,
952 )
953 )
857 954
858 # should be already sorted, but just in case the priorities get weird 955 # should be already sorted, but just in case the priorities get weird
859 candidates.sort(key=lambda c: c.priority, reverse=True) 956 candidates.sort(key=lambda c: c.priority, reverse=True)
860 defer.returnValue(candidates) 957 defer.returnValue(candidates)
861 958
868 @return (D): Deferred fired when factory connection is done or has failed 965 @return (D): Deferred fired when factory connection is done or has failed
869 """ 966 """
870 candidate.factory.connector = connector 967 candidate.factory.connector = connector
871 return candidate.factory.connection 968 return candidate.factory.connection
872 969
873 def connectCandidate(self, client, candidate, session_hash, peer_session_hash=None, delay=None): 970 def connectCandidate(
971 self, client, candidate, session_hash, peer_session_hash=None, delay=None
972 ):
874 """Connect to a candidate 973 """Connect to a candidate
875 974
876 Connection will be done with a Socks5ClientFactory 975 Connection will be done with a Socks5ClientFactory
877 @param candidate(Candidate): candidate to connect to 976 @param candidate(Candidate): candidate to connect to
878 @param session_hash(unicode): hash of the session 977 @param session_hash(unicode): hash of the session
898 d = sat_defer.DelayedDeferred(delay, candidate.host) 997 d = sat_defer.DelayedDeferred(delay, candidate.host)
899 d.addCallback(reactor.connectTCP, candidate.port, factory) 998 d.addCallback(reactor.connectTCP, candidate.port, factory)
900 d.addCallback(self._addConnector, candidate) 999 d.addCallback(self._addConnector, candidate)
901 return d 1000 return d
902 1001
903 def tryCandidates(self, client, candidates, session_hash, peer_session_hash, connection_cb=None, connection_eb=None): 1002 def tryCandidates(
1003 self,
1004 client,
1005 candidates,
1006 session_hash,
1007 peer_session_hash,
1008 connection_cb=None,
1009 connection_eb=None,
1010 ):
904 defers_list = [] 1011 defers_list = []
905 1012
906 for candidate in candidates: 1013 for candidate in candidates:
907 delay = CANDIDATE_DELAY * len(defers_list) 1014 delay = CANDIDATE_DELAY * len(defers_list)
908 if candidate.type == XEP_0065.TYPE_PROXY: 1015 if candidate.type == XEP_0065.TYPE_PROXY:
909 delay += CANDIDATE_DELAY_PROXY 1016 delay += CANDIDATE_DELAY_PROXY
910 d = self.connectCandidate(client, candidate, session_hash, peer_session_hash, delay) 1017 d = self.connectCandidate(
1018 client, candidate, session_hash, peer_session_hash, delay
1019 )
911 if connection_cb is not None: 1020 if connection_cb is not None:
912 d.addCallback(lambda dummy, candidate=candidate, client=client: connection_cb(client, candidate)) 1021 d.addCallback(
1022 lambda dummy, candidate=candidate, client=client: connection_cb(
1023 client, candidate
1024 )
1025 )
913 if connection_eb is not None: 1026 if connection_eb is not None:
914 d.addErrback(connection_eb, client, candidate) 1027 d.addErrback(connection_eb, client, candidate)
915 defers_list.append(d) 1028 defers_list.append(d)
916 1029
917 return defers_list 1030 return defers_list
940 1053
941 def connectionEb(failure, client, candidate): 1054 def connectionEb(failure, client, candidate):
942 if failure.check(defer.CancelledError): 1055 if failure.check(defer.CancelledError):
943 log.debug(u"Connection of {} has been cancelled".format(candidate)) 1056 log.debug(u"Connection of {} has been cancelled".format(candidate))
944 else: 1057 else:
945 log.info(u"Connection of {candidate} Failed: {error}".format( 1058 log.info(
946 candidate = candidate, 1059 u"Connection of {candidate} Failed: {error}".format(
947 error = failure.value)) 1060 candidate=candidate, error=failure.value
1061 )
1062 )
948 candidates[candidates.index(candidate)] = None 1063 candidates[candidates.index(candidate)] = None
949 1064
950 def allTested(self): 1065 def allTested(self):
951 log.debug(u"All candidates have been tested") 1066 log.debug(u"All candidates have been tested")
952 good_candidates = [c for c in candidates if c] 1067 good_candidates = [c for c in candidates if c]
953 return good_candidates[0] if good_candidates else None 1068 return good_candidates[0] if good_candidates else None
954 1069
955 defer_candidates = self.tryCandidates(client, candidates, session_hash, peer_session_hash, connectionCb, connectionEb) 1070 defer_candidates = self.tryCandidates(
1071 client,
1072 candidates,
1073 session_hash,
1074 peer_session_hash,
1075 connectionCb,
1076 connectionEb,
1077 )
956 d_list = defer.DeferredList(defer_candidates) 1078 d_list = defer.DeferredList(defer_candidates)
957 d_list.addCallback(allTested) 1079 d_list.addCallback(allTested)
958 return d_list 1080 return d_list
959 1081
960 def _timeOut(self, session_hash, client): 1082 def _timeOut(self, session_hash, client):
975 or None if self.xep_0065_sid_session was not used 1097 or None if self.xep_0065_sid_session was not used
976 @param client: %(doc_client)s 1098 @param client: %(doc_client)s
977 @param failure_(None, failure.Failure): None if eveything was fine, a failure else 1099 @param failure_(None, failure.Failure): None if eveything was fine, a failure else
978 @return (None, failure.Failure): failure_ is returned 1100 @return (None, failure.Failure): failure_ is returned
979 """ 1101 """
980 log.debug(u'Cleaning session with hash {hash}{id}: {reason}'.format( 1102 log.debug(
981 hash=session_hash, 1103 u"Cleaning session with hash {hash}{id}: {reason}".format(
982 reason='' if failure_ is None else failure_.value, 1104 hash=session_hash,
983 id='' if sid is None else u' (id: {})'.format(sid), 1105 reason="" if failure_ is None else failure_.value,
984 )) 1106 id="" if sid is None else u" (id: {})".format(sid),
1107 )
1108 )
985 1109
986 try: 1110 try:
987 assert self.hash_clients_map[session_hash] == client 1111 assert self.hash_clients_map[session_hash] == client
988 del self.hash_clients_map[session_hash] 1112 del self.hash_clients_map[session_hash]
989 except KeyError: 1113 except KeyError:
1002 return 1126 return
1003 else: 1127 else:
1004 del client._s5b_sessions[session_hash] 1128 del client._s5b_sessions[session_hash]
1005 1129
1006 try: 1130 try:
1007 session_data['timer'].cancel() 1131 session_data["timer"].cancel()
1008 except (internet_error.AlreadyCalled, internet_error.AlreadyCancelled): 1132 except (internet_error.AlreadyCalled, internet_error.AlreadyCancelled):
1009 pass 1133 pass
1010 1134
1011 return failure_ 1135 return failure_
1012 1136
1023 session_data = self._createSession(client, stream_object, to_jid, sid, True) 1147 session_data = self._createSession(client, stream_object, to_jid, sid, True)
1024 1148
1025 session_data[client] = client 1149 session_data[client] = client
1026 1150
1027 def gotCandidates(candidates): 1151 def gotCandidates(candidates):
1028 session_data['candidates'] = candidates 1152 session_data["candidates"] = candidates
1029 iq_elt = client.IQ() 1153 iq_elt = client.IQ()
1030 iq_elt["from"] = client.jid.full() 1154 iq_elt["from"] = client.jid.full()
1031 iq_elt["to"] = to_jid.full() 1155 iq_elt["to"] = to_jid.full()
1032 query_elt = iq_elt.addElement((NS_BS, 'query')) 1156 query_elt = iq_elt.addElement((NS_BS, "query"))
1033 query_elt['mode'] = 'tcp' 1157 query_elt["mode"] = "tcp"
1034 query_elt['sid'] = sid 1158 query_elt["sid"] = sid
1035 1159
1036 for candidate in candidates: 1160 for candidate in candidates:
1037 streamhost = query_elt.addElement('streamhost') 1161 streamhost = query_elt.addElement("streamhost")
1038 streamhost['host'] = candidate.host 1162 streamhost["host"] = candidate.host
1039 streamhost['port'] = str(candidate.port) 1163 streamhost["port"] = str(candidate.port)
1040 streamhost['jid'] = candidate.jid.full() 1164 streamhost["jid"] = candidate.jid.full()
1041 log.debug(u"Candidate proposed: {}".format(candidate)) 1165 log.debug(u"Candidate proposed: {}".format(candidate))
1042 1166
1043 d = iq_elt.send() 1167 d = iq_elt.send()
1044 args = [session_data, client] 1168 args = [session_data, client]
1045 d.addCallbacks(self._IQNegotiationCb, self._IQNegotiationEb, args, None, args) 1169 d.addCallbacks(self._IQNegotiationCb, self._IQNegotiationEb, args, None, args)
1053 @param session_data(dict): data of the session 1177 @param session_data(dict): data of the session
1054 @param client: %(doc_client)s 1178 @param client: %(doc_client)s
1055 @param iq_elt(domish.Element): <iq> result 1179 @param iq_elt(domish.Element): <iq> result
1056 """ 1180 """
1057 try: 1181 try:
1058 query_elt = iq_elt.elements(NS_BS, 'query').next() 1182 query_elt = iq_elt.elements(NS_BS, "query").next()
1059 streamhost_used_elt = query_elt.elements(NS_BS, 'streamhost-used').next() 1183 streamhost_used_elt = query_elt.elements(NS_BS, "streamhost-used").next()
1060 except StopIteration: 1184 except StopIteration:
1061 log.warning(u"No streamhost found in stream query") 1185 log.warning(u"No streamhost found in stream query")
1062 # FIXME: must clean session 1186 # FIXME: must clean session
1063 return 1187 return
1064 1188
1065 streamhost_jid = jid.JID(streamhost_used_elt['jid']) 1189 streamhost_jid = jid.JID(streamhost_used_elt["jid"])
1066 try: 1190 try:
1067 candidate = (c for c in session_data['candidates'] if c.jid == streamhost_jid).next() 1191 candidate = (
1192 c for c in session_data["candidates"] if c.jid == streamhost_jid
1193 ).next()
1068 except StopIteration: 1194 except StopIteration:
1069 log.warning(u"Candidate [{jid}] is unknown !".format(jid=streamhost_jid.full())) 1195 log.warning(
1196 u"Candidate [{jid}] is unknown !".format(jid=streamhost_jid.full())
1197 )
1070 return 1198 return
1071 else: 1199 else:
1072 log.info(u"Candidate choosed by target: {}".format(candidate)) 1200 log.info(u"Candidate choosed by target: {}".format(candidate))
1073 1201
1074 if candidate.type == XEP_0065.TYPE_PROXY: 1202 if candidate.type == XEP_0065.TYPE_PROXY:
1075 log.info(u"A Socks5 proxy is used") 1203 log.info(u"A Socks5 proxy is used")
1076 d = self.connectCandidate(client, candidate, session_data['hash']) 1204 d = self.connectCandidate(client, candidate, session_data["hash"])
1077 d.addCallback(lambda dummy: candidate.activate(session_data['id'], session_data['peer_jid'], client)) 1205 d.addCallback(
1206 lambda dummy: candidate.activate(
1207 session_data["id"], session_data["peer_jid"], client
1208 )
1209 )
1078 d.addErrback(self._activationEb) 1210 d.addErrback(self._activationEb)
1079 else: 1211 else:
1080 d = defer.succeed(None) 1212 d = defer.succeed(None)
1081 1213
1082 d.addCallback(lambda dummy: candidate.startTransfer(session_data['hash'])) 1214 d.addCallback(lambda dummy: candidate.startTransfer(session_data["hash"]))
1083 1215
1084 def _activationEb(self, failure): 1216 def _activationEb(self, failure):
1085 log.warning(u"Proxy activation error: {}".format(failure.value)) 1217 log.warning(u"Proxy activation error: {}".format(failure.value))
1086 1218
1087 def _IQNegotiationEb(self, stanza_err, session_data, client): 1219 def _IQNegotiationEb(self, stanza_err, session_data, client):
1103 @param sid(unicode): session id 1235 @param sid(unicode): session id
1104 @param initiator(bool): if True, this session is create by initiator 1236 @param initiator(bool): if True, this session is create by initiator
1105 @return (dict): session data 1237 @return (dict): session data
1106 """ 1238 """
1107 if sid in client.xep_0065_sid_session: 1239 if sid in client.xep_0065_sid_session:
1108 raise exceptions.ConflictError(u'A session with this id already exists !') 1240 raise exceptions.ConflictError(u"A session with this id already exists !")
1109 if requester: 1241 if requester:
1110 session_hash = getSessionHash(client.jid, to_jid, sid) 1242 session_hash = getSessionHash(client.jid, to_jid, sid)
1111 session_data = self._registerHash(client, session_hash, stream_object) 1243 session_data = self._registerHash(client, session_hash, stream_object)
1112 else: 1244 else:
1113 session_hash = getSessionHash(to_jid, client.jid, sid) 1245 session_hash = getSessionHash(to_jid, client.jid, sid)
1114 session_d = defer.Deferred() 1246 session_d = defer.Deferred()
1115 session_d.addBoth(self.killSession, session_hash, sid, client) 1247 session_d.addBoth(self.killSession, session_hash, sid, client)
1116 session_data = client._s5b_sessions[session_hash] = { 1248 session_data = client._s5b_sessions[session_hash] = {
1117 DEFER_KEY: session_d, 1249 DEFER_KEY: session_d,
1118 TIMER_KEY: reactor.callLater(TIMEOUT, self._timeOut, session_hash, client), 1250 TIMER_KEY: reactor.callLater(
1119 } 1251 TIMEOUT, self._timeOut, session_hash, client
1252 ),
1253 }
1120 client.xep_0065_sid_session[sid] = session_data 1254 client.xep_0065_sid_session[sid] = session_data
1121 session_data.update( 1255 session_data.update(
1122 {'id': sid, 1256 {
1123 'peer_jid': to_jid, 1257 "id": sid,
1124 'stream_object': stream_object, 1258 "peer_jid": to_jid,
1125 'hash': session_hash, 1259 "stream_object": stream_object,
1126 }) 1260 "hash": session_hash,
1261 }
1262 )
1127 1263
1128 return session_data 1264 return session_data
1129 1265
1130 def getSession(self, client, session_hash): 1266 def getSession(self, client, session_hash):
1131 """Return session data 1267 """Return session data
1139 See comments below for details 1275 See comments below for details
1140 @return (dict): session data 1276 @return (dict): session data
1141 """ 1277 """
1142 if client is None: 1278 if client is None:
1143 try: 1279 try:
1144 client = self.hash_clients_map[session_hash] 1280 client = self.hash_clients_map[session_hash]
1145 except KeyError as e: 1281 except KeyError as e:
1146 log.warning(u"The requested session doesn't exists !") 1282 log.warning(u"The requested session doesn't exists !")
1147 raise e 1283 raise e
1148 return client._s5b_sessions[session_hash] 1284 return client._s5b_sessions[session_hash]
1149 1285
1165 session_d = defer.Deferred() 1301 session_d = defer.Deferred()
1166 session_d.addBoth(self.killSession, session_hash, None, client) 1302 session_d.addBoth(self.killSession, session_hash, None, client)
1167 session_data = client._s5b_sessions[session_hash] = { 1303 session_data = client._s5b_sessions[session_hash] = {
1168 DEFER_KEY: session_d, 1304 DEFER_KEY: session_d,
1169 TIMER_KEY: reactor.callLater(TIMEOUT, self._timeOut, session_hash, client), 1305 TIMER_KEY: reactor.callLater(TIMEOUT, self._timeOut, session_hash, client),
1170 } 1306 }
1171 1307
1172 if stream_object is not None: 1308 if stream_object is not None:
1173 session_data['stream_object'] = stream_object 1309 session_data["stream_object"] = stream_object
1174 1310
1175 assert session_hash not in self.hash_clients_map 1311 assert session_hash not in self.hash_clients_map
1176 self.hash_clients_map[session_hash] = client 1312 self.hash_clients_map[session_hash] = client
1177 1313
1178 return session_data 1314 return session_data
1179 1315
1180 def associateStreamObject(self, client, session_hash, stream_object): 1316 def associateStreamObject(self, client, session_hash, stream_object):
1181 """Associate a stream object with a session""" 1317 """Associate a stream object with a session"""
1182 session_data = self.getSession(client, session_hash) 1318 session_data = self.getSession(client, session_hash)
1183 assert 'stream_object' not in session_data 1319 assert "stream_object" not in session_data
1184 session_data['stream_object'] = stream_object 1320 session_data["stream_object"] = stream_object
1185 1321
1186 def streamQuery(self, iq_elt, client): 1322 def streamQuery(self, iq_elt, client):
1187 log.debug(u"BS stream query") 1323 log.debug(u"BS stream query")
1188 1324
1189 iq_elt.handled = True 1325 iq_elt.handled = True
1190 1326
1191 query_elt = iq_elt.elements(NS_BS, 'query').next() 1327 query_elt = iq_elt.elements(NS_BS, "query").next()
1192 try: 1328 try:
1193 sid = query_elt['sid'] 1329 sid = query_elt["sid"]
1194 except KeyError: 1330 except KeyError:
1195 log.warning(u"Invalid bystreams request received") 1331 log.warning(u"Invalid bystreams request received")
1196 return client.sendError(iq_elt, "bad-request") 1332 return client.sendError(iq_elt, "bad-request")
1197 1333
1198 streamhost_elts = list(query_elt.elements(NS_BS, 'streamhost')) 1334 streamhost_elts = list(query_elt.elements(NS_BS, "streamhost"))
1199 if not streamhost_elts: 1335 if not streamhost_elts:
1200 return client.sendError(iq_elt, "bad-request") 1336 return client.sendError(iq_elt, "bad-request")
1201 1337
1202 try: 1338 try:
1203 session_data = client.xep_0065_sid_session[sid] 1339 session_data = client.xep_0065_sid_session[sid]
1204 except KeyError: 1340 except KeyError:
1205 log.warning(u"Ignoring unexpected BS transfer: {}".format(sid)) 1341 log.warning(u"Ignoring unexpected BS transfer: {}".format(sid))
1206 return client.sendError(iq_elt, 'not-acceptable') 1342 return client.sendError(iq_elt, "not-acceptable")
1207 1343
1208 peer_jid = session_data["peer_jid"] = jid.JID(iq_elt["from"]) 1344 peer_jid = session_data["peer_jid"] = jid.JID(iq_elt["from"])
1209 1345
1210 candidates = [] 1346 candidates = []
1211 nb_sh = len(streamhost_elts) 1347 nb_sh = len(streamhost_elts)
1212 for idx, sh_elt in enumerate(streamhost_elts): 1348 for idx, sh_elt in enumerate(streamhost_elts):
1213 try: 1349 try:
1214 host, port, jid_ = sh_elt['host'], sh_elt['port'], jid.JID(sh_elt['jid']) 1350 host, port, jid_ = sh_elt["host"], sh_elt["port"], jid.JID(sh_elt["jid"])
1215 except KeyError: 1351 except KeyError:
1216 log.warning(u"malformed streamhost element") 1352 log.warning(u"malformed streamhost element")
1217 return client.sendError(iq_elt, "bad-request") 1353 return client.sendError(iq_elt, "bad-request")
1218 priority = nb_sh - idx 1354 priority = nb_sh - idx
1219 if jid_.userhostJID() != peer_jid.userhostJID(): 1355 if jid_.userhostJID() != peer_jid.userhostJID():
1223 candidates.append(Candidate(host, port, type_, priority, jid_)) 1359 candidates.append(Candidate(host, port, type_, priority, jid_))
1224 1360
1225 for candidate in candidates: 1361 for candidate in candidates:
1226 log.info(u"Candidate proposed: {}".format(candidate)) 1362 log.info(u"Candidate proposed: {}".format(candidate))
1227 1363
1228 d = self.getBestCandidate(client, candidates, session_data['hash']) 1364 d = self.getBestCandidate(client, candidates, session_data["hash"])
1229 d.addCallback(self._ackStream, iq_elt, session_data, client) 1365 d.addCallback(self._ackStream, iq_elt, session_data, client)
1230 1366
1231 def _ackStream(self, candidate, iq_elt, session_data, client): 1367 def _ackStream(self, candidate, iq_elt, session_data, client):
1232 if candidate is None: 1368 if candidate is None:
1233 log.info("No streamhost candidate worked, we have to end negotiation") 1369 log.info("No streamhost candidate worked, we have to end negotiation")
1234 return client.sendError(iq_elt, 'item-not-found') 1370 return client.sendError(iq_elt, "item-not-found")
1235 log.info(u"We choose: {}".format(candidate)) 1371 log.info(u"We choose: {}".format(candidate))
1236 result_elt = xmlstream.toResponse(iq_elt, 'result') 1372 result_elt = xmlstream.toResponse(iq_elt, "result")
1237 query_elt = result_elt.addElement((NS_BS, 'query')) 1373 query_elt = result_elt.addElement((NS_BS, "query"))
1238 query_elt['sid'] = session_data['id'] 1374 query_elt["sid"] = session_data["id"]
1239 streamhost_used_elt = query_elt.addElement('streamhost-used') 1375 streamhost_used_elt = query_elt.addElement("streamhost-used")
1240 streamhost_used_elt['jid'] = candidate.jid.full() 1376 streamhost_used_elt["jid"] = candidate.jid.full()
1241 client.send(result_elt) 1377 client.send(result_elt)
1242 1378
1243 1379
1244 class XEP_0065_handler(XMPPHandler): 1380 class XEP_0065_handler(XMPPHandler):
1245 implements(iwokkel.IDisco) 1381 implements(iwokkel.IDisco)
1247 def __init__(self, plugin_parent): 1383 def __init__(self, plugin_parent):
1248 self.plugin_parent = plugin_parent 1384 self.plugin_parent = plugin_parent
1249 self.host = plugin_parent.host 1385 self.host = plugin_parent.host
1250 1386
1251 def connectionInitialized(self): 1387 def connectionInitialized(self):
1252 self.xmlstream.addObserver(BS_REQUEST, self.plugin_parent.streamQuery, client=self.parent) 1388 self.xmlstream.addObserver(
1253 1389 BS_REQUEST, self.plugin_parent.streamQuery, client=self.parent
1254 def getDiscoInfo(self, requestor, target, nodeIdentifier=''): 1390 )
1391
1392 def getDiscoInfo(self, requestor, target, nodeIdentifier=""):
1255 return [disco.DiscoFeature(NS_BS)] 1393 return [disco.DiscoFeature(NS_BS)]
1256 1394
1257 def getDiscoItems(self, requestor, target, nodeIdentifier=''): 1395 def getDiscoItems(self, requestor, target, nodeIdentifier=""):
1258 return [] 1396 return []