comparison sat/plugins/plugin_xep_0065.py @ 4037:524856bd7b19

massive refactoring to switch from camelCase to snake_case: historically, Libervia (SàT before) was using camelCase as allowed by PEP8 when using a pre-PEP8 code, to use the same coding style as in Twisted. However, snake_case is more readable and it's better to follow PEP8 best practices, so it has been decided to move on full snake_case. Because Libervia has a huge codebase, this ended with a ugly mix of camelCase and snake_case. To fix that, this patch does a big refactoring by renaming every function and method (including bridge) that are not coming from Twisted or Wokkel, to use fully snake_case. This is a massive change, and may result in some bugs.
author Goffi <goffi@goffi.org>
date Sat, 08 Apr 2023 13:54:42 +0200
parents be6d91572633
children
comparison
equal deleted inserted replaced
4036:c4464d7ae97b 4037:524856bd7b19
184 assert isinstance(jid_, jid.JID) 184 assert isinstance(jid_, jid.JID)
185 self.host, self.port, self.type, self.jid = (host, int(port), type_, jid_) 185 self.host, self.port, self.type, self.jid = (host, int(port), type_, jid_)
186 self.id = id_ if id_ is not None else str(uuid.uuid4()) 186 self.id = id_ if id_ is not None else str(uuid.uuid4())
187 if priority_local: 187 if priority_local:
188 self._local_priority = int(priority) 188 self._local_priority = int(priority)
189 self._priority = self.calculatePriority() 189 self._priority = self.calculate_priority()
190 else: 190 else:
191 self._local_priority = 0 191 self._local_priority = 0
192 self._priority = int(priority) 192 self._priority = int(priority)
193 self.factory = factory 193 self.factory = factory
194 194
229 return False 229 return False
230 230
231 def __ne__(self, other): 231 def __ne__(self, other):
232 return not self.__eq__(other) 232 return not self.__eq__(other)
233 233
234 def calculatePriority(self): 234 def calculate_priority(self):
235 """Calculate candidate priority according to XEP-0260 §2.2 235 """Calculate candidate priority according to XEP-0260 §2.2
236 236
237 237
238 @return (int): priority 238 @return (int): priority
239 """ 239 """
252 def activate(self, client, sid, peer_jid, local_jid): 252 def activate(self, client, sid, peer_jid, local_jid):
253 """Activate the proxy candidate 253 """Activate the proxy candidate
254 254
255 Send activation request as explained in XEP-0065 § 6.3.5 255 Send activation request as explained in XEP-0065 § 6.3.5
256 Must only be used with proxy candidates 256 Must only be used with proxy candidates
257 @param sid(unicode): session id (same as for getSessionHash) 257 @param sid(unicode): session id (same as for get_session_hash)
258 @param peer_jid(jid.JID): jid of the other peer 258 @param peer_jid(jid.JID): jid of the other peer
259 @return (D(domish.Element)): IQ result (or error) 259 @return (D(domish.Element)): IQ result (or error)
260 """ 260 """
261 assert self.type == XEP_0065.TYPE_PROXY 261 assert self.type == XEP_0065.TYPE_PROXY
262 iq_elt = client.IQ() 262 iq_elt = client.IQ()
265 query_elt = iq_elt.addElement((NS_BS, "query")) 265 query_elt = iq_elt.addElement((NS_BS, "query"))
266 query_elt["sid"] = sid 266 query_elt["sid"] = sid
267 query_elt.addElement("activate", content=peer_jid.full()) 267 query_elt.addElement("activate", content=peer_jid.full())
268 return iq_elt.send() 268 return iq_elt.send()
269 269
270 def startTransfer(self, session_hash=None): 270 def start_transfer(self, session_hash=None):
271 if self.type == XEP_0065.TYPE_PROXY: 271 if self.type == XEP_0065.TYPE_PROXY:
272 chunk_size = 4096 # Prosody's proxy reject bigger chunks by default 272 chunk_size = 4096 # Prosody's proxy reject bigger chunks by default
273 else: 273 else:
274 chunk_size = None 274 chunk_size = None
275 self.factory.startTransfer(session_hash, chunk_size=chunk_size) 275 self.factory.start_transfer(session_hash, chunk_size=chunk_size)
276 276
277 277
278 def getSessionHash(requester_jid, target_jid, sid): 278 def get_session_hash(requester_jid, target_jid, sid):
279 """Calculate SHA1 Hash according to XEP-0065 §5.3.2 279 """Calculate SHA1 Hash according to XEP-0065 §5.3.2
280 280
281 @param requester_jid(jid.JID): jid of the requester (the one which activate the proxy) 281 @param requester_jid(jid.JID): jid of the requester (the one which activate the proxy)
282 @param target_jid(jid.JID): jid of the target 282 @param target_jid(jid.JID): jid of the target
283 @param sid(unicode): session id 283 @param sid(unicode): session id
332 if self.server_mode: 332 if self.server_mode:
333 return self.factory.getSession(self._session_hash) 333 return self.factory.getSession(self._session_hash)
334 else: 334 else:
335 return self.factory.getSession() 335 return self.factory.getSession()
336 336
337 def _startNegotiation(self): 337 def _start_negotiation(self):
338 log.debug("starting negotiation (client mode)") 338 log.debug("starting negotiation (client mode)")
339 self.state = STATE_CLIENT_AUTH 339 self.state = STATE_CLIENT_AUTH
340 self.transport.write(struct.pack("!3B", SOCKS5_VER, 1, AUTHMECH_ANON)) 340 self.transport.write(struct.pack("!3B", SOCKS5_VER, 1, AUTHMECH_ANON))
341 341
342 def _parseNegotiation(self): 342 def _parse_negotiation(self):
343 try: 343 try:
344 # Parse out data 344 # Parse out data
345 ver, nmethod = struct.unpack("!BB", self.buf[:2]) 345 ver, nmethod = struct.unpack("!BB", self.buf[:2])
346 methods = struct.unpack("%dB" % nmethod, self.buf[2 : nmethod + 2]) 346 methods = struct.unpack("%dB" % nmethod, self.buf[2 : nmethod + 2])
347 347
371 self.transport.write(struct.pack("!BB", SOCKS5_VER, AUTHMECH_INVALID)) 371 self.transport.write(struct.pack("!BB", SOCKS5_VER, AUTHMECH_INVALID))
372 self.transport.loseConnection() 372 self.transport.loseConnection()
373 except struct.error: 373 except struct.error:
374 pass 374 pass
375 375
376 def _parseUserPass(self): 376 def _parse_user_pass(self):
377 try: 377 try:
378 # Parse out data 378 # Parse out data
379 ver, ulen = struct.unpack("BB", self.buf[:2]) 379 ver, ulen = struct.unpack("BB", self.buf[:2])
380 uname, = struct.unpack("%ds" % ulen, self.buf[2 : ulen + 2]) 380 uname, = struct.unpack("%ds" % ulen, self.buf[2 : ulen + 2])
381 plen, = struct.unpack("B", self.buf[ulen + 2]) 381 plen, = struct.unpack("B", self.buf[ulen + 2])
382 password, = struct.unpack("%ds" % plen, self.buf[ulen + 3 : ulen + 3 + plen]) 382 password, = struct.unpack("%ds" % plen, self.buf[ulen + 3 : ulen + 3 + plen])
383 # Trim off fron of the buffer 383 # Trim off fron of the buffer
384 self.buf = self.buf[3 + ulen + plen :] 384 self.buf = self.buf[3 + ulen + plen :]
385 # Fire event to authenticate user 385 # Fire event to authenticate user
386 if self.authenticateUserPass(uname, password): 386 if self.authenticate_user_pass(uname, password):
387 # Signal success 387 # Signal success
388 self.state = STATE_REQUEST 388 self.state = STATE_REQUEST
389 self.transport.write(struct.pack("!BB", SOCKS5_VER, 0x00)) 389 self.transport.write(struct.pack("!BB", SOCKS5_VER, 0x00))
390 else: 390 else:
391 # Signal failure 391 # Signal failure
392 self.transport.write(struct.pack("!BB", SOCKS5_VER, 0x01)) 392 self.transport.write(struct.pack("!BB", SOCKS5_VER, 0x01))
393 self.transport.loseConnection() 393 self.transport.loseConnection()
394 except struct.error: 394 except struct.error:
395 pass 395 pass
396 396
397 def sendErrorReply(self, errorcode): 397 def send_error_reply(self, errorcode):
398 # Any other address types are not supported 398 # Any other address types are not supported
399 result = struct.pack("!BBBBIH", SOCKS5_VER, errorcode, 0, 1, 0, 0) 399 result = struct.pack("!BBBBIH", SOCKS5_VER, errorcode, 0, 1, 0, 0)
400 self.transport.write(result) 400 self.transport.write(result)
401 self.transport.loseConnection() 401 self.transport.loseConnection()
402 402
405 # Parse out data and trim buffer accordingly 405 # Parse out data and trim buffer accordingly
406 ver, cmd, rsvd, self.addressType = struct.unpack("!BBBB", self.buf[:4]) 406 ver, cmd, rsvd, self.addressType = struct.unpack("!BBBB", self.buf[:4])
407 407
408 # Ensure we actually support the requested address type 408 # Ensure we actually support the requested address type
409 if self.addressType not in self.supportedAddrs: 409 if self.addressType not in self.supportedAddrs:
410 self.sendErrorReply(REPLY_ADDR_NOT_SUPPORTED) 410 self.send_error_reply(REPLY_ADDR_NOT_SUPPORTED)
411 return 411 return
412 412
413 # Deal with addresses 413 # Deal with addresses
414 if self.addressType == ADDR_IPV4: 414 if self.addressType == ADDR_IPV4:
415 addr, port = struct.unpack("!IH", self.buf[4:10]) 415 addr, port = struct.unpack("!IH", self.buf[4:10])
418 nlen = self.buf[4] 418 nlen = self.buf[4]
419 addr, port = struct.unpack("!%dsH" % nlen, self.buf[5:]) 419 addr, port = struct.unpack("!%dsH" % nlen, self.buf[5:])
420 self.buf = self.buf[7 + len(addr) :] 420 self.buf = self.buf[7 + len(addr) :]
421 else: 421 else:
422 # Any other address types are not supported 422 # Any other address types are not supported
423 self.sendErrorReply(REPLY_ADDR_NOT_SUPPORTED) 423 self.send_error_reply(REPLY_ADDR_NOT_SUPPORTED)
424 return 424 return
425 425
426 # Ensure command is supported 426 # Ensure command is supported
427 if cmd not in self.enabledCommands: 427 if cmd not in self.enabledCommands:
428 # Send a not supported error 428 # Send a not supported error
429 self.sendErrorReply(REPLY_CMD_NOT_SUPPORTED) 429 self.send_error_reply(REPLY_CMD_NOT_SUPPORTED)
430 return 430 return
431 431
432 # Process the command 432 # Process the command
433 if cmd == CMD_CONNECT: 433 if cmd == CMD_CONNECT:
434 self.connectRequested(addr, port) 434 self.connect_requested(addr, port)
435 elif cmd == CMD_BIND: 435 elif cmd == CMD_BIND:
436 self.bindRequested(addr, port) 436 self.bind_requested(addr, port)
437 else: 437 else:
438 # Any other command is not supported 438 # Any other command is not supported
439 self.sendErrorReply(REPLY_CMD_NOT_SUPPORTED) 439 self.send_error_reply(REPLY_CMD_NOT_SUPPORTED)
440 440
441 except struct.error: 441 except struct.error:
442 # The buffer is probably not complete, we need to wait more 442 # The buffer is probably not complete, we need to wait more
443 return None 443 return None
444 444
445 def _makeRequest(self): 445 def _make_request(self):
446 hash_ = self._session_hash.encode('utf-8') 446 hash_ = self._session_hash.encode('utf-8')
447 request = struct.pack( 447 request = struct.pack(
448 "!5B%dsH" % len(hash_), 448 "!5B%dsH" % len(hash_),
449 SOCKS5_VER, 449 SOCKS5_VER,
450 CMD_CONNECT, 450 CMD_CONNECT,
455 0, 455 0,
456 ) 456 )
457 self.transport.write(request) 457 self.transport.write(request)
458 self.state = STATE_CLIENT_REQUEST 458 self.state = STATE_CLIENT_REQUEST
459 459
460 def _parseRequestReply(self): 460 def _parse_request_reply(self):
461 try: 461 try:
462 ver, rep, rsvd, self.addressType = struct.unpack("!BBBB", self.buf[:4]) 462 ver, rep, rsvd, self.addressType = struct.unpack("!BBBB", self.buf[:4])
463 # Ensure we actually support the requested address type 463 # Ensure we actually support the requested address type
464 if self.addressType not in self.supportedAddrs: 464 if self.addressType not in self.supportedAddrs:
465 self.sendErrorReply(REPLY_ADDR_NOT_SUPPORTED) 465 self.send_error_reply(REPLY_ADDR_NOT_SUPPORTED)
466 return 466 return
467 467
468 # Deal with addresses 468 # Deal with addresses
469 if self.addressType == ADDR_IPV4: 469 if self.addressType == ADDR_IPV4:
470 addr, port = struct.unpack("!IH", self.buf[4:10]) 470 addr, port = struct.unpack("!IH", self.buf[4:10])
473 nlen = self.buf[4] 473 nlen = self.buf[4]
474 addr, port = struct.unpack("!%dsH" % nlen, self.buf[5:]) 474 addr, port = struct.unpack("!%dsH" % nlen, self.buf[5:])
475 self.buf = self.buf[7 + len(addr) :] 475 self.buf = self.buf[7 + len(addr) :]
476 else: 476 else:
477 # Any other address types are not supported 477 # Any other address types are not supported
478 self.sendErrorReply(REPLY_ADDR_NOT_SUPPORTED) 478 self.send_error_reply(REPLY_ADDR_NOT_SUPPORTED)
479 return 479 return
480 480
481 # Ensure reply is OK 481 # Ensure reply is OK
482 if rep != REPLY_SUCCESS: 482 if rep != REPLY_SUCCESS:
483 self.loseConnection() 483 self.loseConnection()
495 "Socks5 connectionMade (mode = {})".format( 495 "Socks5 connectionMade (mode = {})".format(
496 "server" if self.state == STATE_INITIAL else "client" 496 "server" if self.state == STATE_INITIAL else "client"
497 ) 497 )
498 ) 498 )
499 if self.state == STATE_CLIENT_INITIAL: 499 if self.state == STATE_CLIENT_INITIAL:
500 self._startNegotiation() 500 self._start_negotiation()
501 501
502 def connectRequested(self, addr, port): 502 def connect_requested(self, addr, port):
503 # Check that this session is expected 503 # Check that this session is expected
504 if not self.factory.addToSession(addr.decode('utf-8'), self): 504 if not self.factory.add_to_session(addr.decode('utf-8'), self):
505 log.warning( 505 log.warning(
506 "Unexpected connection request received from {host}".format( 506 "Unexpected connection request received from {host}".format(
507 host=self.transport.getPeer().host 507 host=self.transport.getPeer().host
508 ) 508 )
509 ) 509 )
510 self.sendErrorReply(REPLY_CONN_REFUSED) 510 self.send_error_reply(REPLY_CONN_REFUSED)
511 return 511 return
512 self._session_hash = addr.decode('utf-8') 512 self._session_hash = addr.decode('utf-8')
513 self.connectCompleted(addr, 0) 513 self.connect_completed(addr, 0)
514 514
515 def startTransfer(self, chunk_size): 515 def start_transfer(self, chunk_size):
516 """Callback called when the result iq is received 516 """Callback called when the result iq is received
517 517
518 @param chunk_size(None, int): size of the buffer, or None for default 518 @param chunk_size(None, int): size of the buffer, or None for default
519 """ 519 """
520 self.active = True 520 self.active = True
521 if chunk_size is not None: 521 if chunk_size is not None:
522 self.CHUNK_SIZE = chunk_size 522 self.CHUNK_SIZE = chunk_size
523 log.debug("Starting file transfer") 523 log.debug("Starting file transfer")
524 d = self.stream_object.startStream(self.transport) 524 d = self.stream_object.start_stream(self.transport)
525 d.addCallback(self.streamFinished) 525 d.addCallback(self.stream_finished)
526 526
527 def streamFinished(self, d): 527 def stream_finished(self, d):
528 log.info(_("File transfer completed, closing connection")) 528 log.info(_("File transfer completed, closing connection"))
529 self.transport.loseConnection() 529 self.transport.loseConnection()
530 530
531 def connectCompleted(self, remotehost, remoteport): 531 def connect_completed(self, remotehost, remoteport):
532 if self.addressType == ADDR_IPV4: 532 if self.addressType == ADDR_IPV4:
533 result = struct.pack( 533 result = struct.pack(
534 "!BBBBIH", SOCKS5_VER, REPLY_SUCCESS, 0, 1, remotehost, remoteport 534 "!BBBBIH", SOCKS5_VER, REPLY_SUCCESS, 0, 1, remotehost, remoteport
535 ) 535 )
536 elif self.addressType == ADDR_DOMAINNAME: 536 elif self.addressType == ADDR_DOMAINNAME:
545 remoteport, 545 remoteport,
546 ) 546 )
547 self.transport.write(result) 547 self.transport.write(result)
548 self.state = STATE_READY 548 self.state = STATE_READY
549 549
550 def bindRequested(self, addr, port): 550 def bind_requested(self, addr, port):
551 pass 551 pass
552 552
553 def authenticateUserPass(self, user, passwd): 553 def authenticate_user_pass(self, user, passwd):
554 # FIXME: implement authentication and remove the debug printing a password 554 # FIXME: implement authentication and remove the debug printing a password
555 log.debug("User/pass: %s/%s" % (user, passwd)) 555 log.debug("User/pass: %s/%s" % (user, passwd))
556 return True 556 return True
557 557
558 def dataReceived(self, buf): 558 def dataReceived(self, buf):
564 self.getSession()[TIMER_KEY].cancel() 564 self.getSession()[TIMER_KEY].cancel()
565 return 565 return
566 566
567 self.buf = self.buf + buf 567 self.buf = self.buf + buf
568 if self.state == STATE_INITIAL: 568 if self.state == STATE_INITIAL:
569 self._parseNegotiation() 569 self._parse_negotiation()
570 if self.state == STATE_AUTH_USERPASS: 570 if self.state == STATE_AUTH_USERPASS:
571 self._parseUserPass() 571 self._parse_user_pass()
572 if self.state == STATE_REQUEST: 572 if self.state == STATE_REQUEST:
573 self._parseRequest() 573 self._parseRequest()
574 if self.state == STATE_CLIENT_REQUEST: 574 if self.state == STATE_CLIENT_REQUEST:
575 self._parseRequestReply() 575 self._parse_request_reply()
576 if self.state == STATE_CLIENT_AUTH: 576 if self.state == STATE_CLIENT_AUTH:
577 ver, method = struct.unpack("!BB", buf) 577 ver, method = struct.unpack("!BB", buf)
578 self.buf = self.buf[2:] 578 self.buf = self.buf[2:]
579 if ver != SOCKS5_VER or method != AUTHMECH_ANON: 579 if ver != SOCKS5_VER or method != AUTHMECH_ANON:
580 self.transport.loseConnection() 580 self.transport.loseConnection()
581 else: 581 else:
582 self._makeRequest() 582 self._make_request()
583 583
584 def connectionLost(self, reason): 584 def connectionLost(self, reason):
585 log.debug("Socks5 connection lost: {}".format(reason.value)) 585 log.debug("Socks5 connection lost: {}".format(reason.value))
586 if self.state != STATE_READY: 586 if self.state != STATE_READY:
587 self.connection.errback(reason) 587 self.connection.errback(reason)
589 try: 589 try:
590 session_hash = self._session_hash 590 session_hash = self._session_hash
591 except AttributeError: 591 except AttributeError:
592 log.debug("no session has been received yet") 592 log.debug("no session has been received yet")
593 else: 593 else:
594 self.factory.removeFromSession(session_hash, self, reason) 594 self.factory.remove_from_session(session_hash, self, reason)
595 595
596 596
597 class Socks5ServerFactory(protocol.ServerFactory): 597 class Socks5ServerFactory(protocol.ServerFactory):
598 protocol = SOCKSv5 598 protocol = SOCKSv5
599 599
604 self.parent = parent 604 self.parent = parent
605 605
606 def getSession(self, session_hash): 606 def getSession(self, session_hash):
607 return self.parent.getSession(None, session_hash) 607 return self.parent.getSession(None, session_hash)
608 608
609 def startTransfer(self, session_hash, chunk_size=None): 609 def start_transfer(self, session_hash, chunk_size=None):
610 session = self.getSession(session_hash) 610 session = self.getSession(session_hash)
611 try: 611 try:
612 protocol = session["protocols"][0] 612 protocol = session["protocols"][0]
613 except (KeyError, IndexError): 613 except (KeyError, IndexError):
614 log.error("Can't start file transfer, can't find protocol") 614 log.error("Can't start file transfer, can't find protocol")
615 else: 615 else:
616 session[TIMER_KEY].cancel() 616 session[TIMER_KEY].cancel()
617 protocol.startTransfer(chunk_size) 617 protocol.start_transfer(chunk_size)
618 618
619 def addToSession(self, session_hash, protocol): 619 def add_to_session(self, session_hash, protocol):
620 """Check is session_hash is valid, and associate protocol with it 620 """Check is session_hash is valid, and associate protocol with it
621 621
622 the session will be associated to the corresponding candidate 622 the session will be associated to the corresponding candidate
623 @param session_hash(str): hash of the session 623 @param session_hash(str): hash of the session
624 @param protocol(SOCKSv5): protocol instance 624 @param protocol(SOCKSv5): protocol instance
631 return False 631 return False
632 else: 632 else:
633 session_data.setdefault("protocols", []).append(protocol) 633 session_data.setdefault("protocols", []).append(protocol)
634 return True 634 return True
635 635
636 def removeFromSession(self, session_hash, protocol, reason): 636 def remove_from_session(self, session_hash, protocol, reason):
637 """Remove a protocol from session_data 637 """Remove a protocol from session_data
638 638
639 There can be several protocol instances while candidates are tried, they 639 There can be several protocol instances while candidates are tried, they
640 have removed when candidate connection is closed 640 have removed when candidate connection is closed
641 @param session_hash(str): hash of the session 641 @param session_hash(str): hash of the session
681 self.connector.disconnect() 681 self.connector.disconnect()
682 682
683 def getSession(self): 683 def getSession(self):
684 return self.session 684 return self.session
685 685
686 def startTransfer(self, __=None, chunk_size=None): 686 def start_transfer(self, __=None, chunk_size=None):
687 self.session[TIMER_KEY].cancel() 687 self.session[TIMER_KEY].cancel()
688 self._protocol_instance.startTransfer(chunk_size) 688 self._protocol_instance.start_transfer(chunk_size)
689 689
690 def clientConnectionFailed(self, connector, reason): 690 def clientConnectionFailed(self, connector, reason):
691 log.debug("Connection failed") 691 log.debug("Connection failed")
692 self.connection.errback(reason) 692 self.connection.errback(reason)
693 693
739 log.debug("NAT Port plugin not available") 739 log.debug("NAT Port plugin not available")
740 self._np = None 740 self._np = None
741 741
742 # parameters 742 # parameters
743 # XXX: params are not used for now, but they may be used in the futur to force proxy/IP 743 # XXX: params are not used for now, but they may be used in the futur to force proxy/IP
744 # host.memory.updateParams(PARAMS) 744 # host.memory.update_params(PARAMS)
745 745
746 def getHandler(self, client): 746 def get_handler(self, client):
747 return XEP_0065_handler(self) 747 return XEP_0065_handler(self)
748 748
749 def profileConnected(self, client): 749 def profile_connected(self, client):
750 client.xep_0065_sid_session = {} # key: stream_id, value: session_data(dict) 750 client.xep_0065_sid_session = {} # key: stream_id, value: session_data(dict)
751 client._s5b_sessions = {} 751 client._s5b_sessions = {}
752 752
753 def getSessionHash(self, from_jid, to_jid, sid): 753 def get_session_hash(self, from_jid, to_jid, sid):
754 return getSessionHash(from_jid, to_jid, sid) 754 return get_session_hash(from_jid, to_jid, sid)
755 755
756 def getSocks5ServerFactory(self): 756 def get_socks_5_server_factory(self):
757 """Return server factory 757 """Return server factory
758 758
759 The server is created if it doesn't exists yet 759 The server is created if it doesn't exists yet
760 self._server_factory_port is set on server creation 760 self._server_factory_port is set on server creation
761 """ 761 """
783 ) 783 )
784 ) 784 )
785 return self._server_factory 785 return self._server_factory
786 786
787 @defer.inlineCallbacks 787 @defer.inlineCallbacks
788 def getProxy(self, client, local_jid): 788 def get_proxy(self, client, local_jid):
789 """Return the proxy available for this profile 789 """Return the proxy available for this profile
790 790
791 cache is used between clients using the same server 791 cache is used between clients using the same server
792 @param local_jid(jid.JID): same as for [getCandidates] 792 @param local_jid(jid.JID): same as for [get_candidates]
793 @return ((D)(ProxyInfos, None)): Found proxy infos, 793 @return ((D)(ProxyInfos, None)): Found proxy infos,
794 or None if not acceptable proxy is found 794 or None if not acceptable proxy is found
795 @raise exceptions.NotFound: no Proxy found 795 @raise exceptions.NotFound: no Proxy found
796 """ 796 """
797 797
805 defer.returnValue(self._cache_proxies[server]) 805 defer.returnValue(self._cache_proxies[server])
806 except KeyError: 806 except KeyError:
807 pass 807 pass
808 try: 808 try:
809 proxy = ( 809 proxy = (
810 yield self.host.findServiceEntities(client, "proxy", "bytestreams") 810 yield self.host.find_service_entities(client, "proxy", "bytestreams")
811 ).pop() 811 ).pop()
812 except (defer.CancelledError, StopIteration, KeyError): 812 except (defer.CancelledError, StopIteration, KeyError):
813 notFound(server) 813 notFound(server)
814 iq_elt = client.IQ("get") 814 iq_elt = client.IQ("get")
815 iq_elt["from"] = local_jid.full() 815 iq_elt["from"] = local_jid.full()
842 proxy_infos = self._cache_proxies[server] = ProxyInfos(host, jid_, port) 842 proxy_infos = self._cache_proxies[server] = ProxyInfos(host, jid_, port)
843 log.info("Proxy found: {}".format(proxy_infos)) 843 log.info("Proxy found: {}".format(proxy_infos))
844 defer.returnValue(proxy_infos) 844 defer.returnValue(proxy_infos)
845 845
846 @defer.inlineCallbacks 846 @defer.inlineCallbacks
847 def _getNetworkData(self, client): 847 def _get_network_data(self, client):
848 """Retrieve information about network 848 """Retrieve information about network
849 849
850 @param client: %(doc_client)s 850 @param client: %(doc_client)s
851 @return (D(tuple[local_port, external_port, local_ips, external_ip])): network data 851 @return (D(tuple[local_port, external_port, local_ips, external_ip])): network data
852 """ 852 """
853 self.getSocks5ServerFactory() 853 self.get_socks_5_server_factory()
854 local_port = self._server_factory_port 854 local_port = self._server_factory_port
855 external_ip = yield self._ip.getExternalIP(client) 855 external_ip = yield self._ip.get_external_ip(client)
856 local_ips = yield self._ip.getLocalIPs(client) 856 local_ips = yield self._ip.get_local_i_ps(client)
857 857
858 if external_ip is not None and self._external_port is None: 858 if external_ip is not None and self._external_port is None:
859 if external_ip != local_ips[0]: 859 if external_ip != local_ips[0]:
860 log.info("We are probably behind a NAT") 860 log.info("We are probably behind a NAT")
861 if self._np is None: 861 if self._np is None:
862 log.warning("NAT port plugin not available, we can't map port") 862 log.warning("NAT port plugin not available, we can't map port")
863 else: 863 else:
864 ext_port = yield self._np.mapPort( 864 ext_port = yield self._np.map_port(
865 local_port, desc="SaT socks5 stream" 865 local_port, desc="SaT socks5 stream"
866 ) 866 )
867 if ext_port is None: 867 if ext_port is None:
868 log.warning("Can't map NAT port") 868 log.warning("Can't map NAT port")
869 else: 869 else:
870 self._external_port = ext_port 870 self._external_port = ext_port
871 871
872 defer.returnValue((local_port, self._external_port, local_ips, external_ip)) 872 defer.returnValue((local_port, self._external_port, local_ips, external_ip))
873 873
874 @defer.inlineCallbacks 874 @defer.inlineCallbacks
875 def getCandidates(self, client, local_jid): 875 def get_candidates(self, client, local_jid):
876 """Return a list of our stream candidates 876 """Return a list of our stream candidates
877 877
878 @param local_jid(jid.JID): jid to use as local jid 878 @param local_jid(jid.JID): jid to use as local jid
879 This is needed for client which can be addressed with a different jid than 879 This is needed for client which can be addressed with a different jid than
880 client.jid if a local part is used (e.g. piotr@file.example.net where 880 client.jid if a local part is used (e.g. piotr@file.example.net where
881 client.jid would be file.example.net) 881 client.jid would be file.example.net)
882 @return (D(list[Candidate])): list of candidates, ordered by priority 882 @return (D(list[Candidate])): list of candidates, ordered by priority
883 """ 883 """
884 server_factory = yield self.getSocks5ServerFactory() 884 server_factory = yield self.get_socks_5_server_factory()
885 local_port, ext_port, local_ips, external_ip = yield self._getNetworkData(client) 885 local_port, ext_port, local_ips, external_ip = yield self._get_network_data(client)
886 try: 886 try:
887 proxy = yield self.getProxy(client, local_jid) 887 proxy = yield self.get_proxy(client, local_jid)
888 except exceptions.NotFound: 888 except exceptions.NotFound:
889 proxy = None 889 proxy = None
890 890
891 # its time to gather the candidates 891 # its time to gather the candidates
892 candidates = [] 892 candidates = []
948 948
949 # should be already sorted, but just in case the priorities get weird 949 # should be already sorted, but just in case the priorities get weird
950 candidates.sort(key=lambda c: c.priority, reverse=True) 950 candidates.sort(key=lambda c: c.priority, reverse=True)
951 defer.returnValue(candidates) 951 defer.returnValue(candidates)
952 952
953 def _addConnector(self, connector, candidate): 953 def _add_connector(self, connector, candidate):
954 """Add connector used to connect to candidate, and return client factory's connection Deferred 954 """Add connector used to connect to candidate, and return client factory's connection Deferred
955 955
956 the connector can be used to disconnect the candidate, and returning the factory's connection Deferred allow to wait for connection completion 956 the connector can be used to disconnect the candidate, and returning the factory's connection Deferred allow to wait for connection completion
957 @param connector: a connector implementing IConnector 957 @param connector: a connector implementing IConnector
958 @param candidate(Candidate): candidate linked to the connector 958 @param candidate(Candidate): candidate linked to the connector
959 @return (D): Deferred fired when factory connection is done or has failed 959 @return (D): Deferred fired when factory connection is done or has failed
960 """ 960 """
961 candidate.factory.connector = connector 961 candidate.factory.connector = connector
962 return candidate.factory.connection 962 return candidate.factory.connection
963 963
964 def connectCandidate( 964 def connect_candidate(
965 self, client, candidate, session_hash, peer_session_hash=None, delay=None 965 self, client, candidate, session_hash, peer_session_hash=None, delay=None
966 ): 966 ):
967 """Connect to a candidate 967 """Connect to a candidate
968 968
969 Connection will be done with a Socks5ClientFactory 969 Connection will be done with a Socks5ClientFactory
973 @param peer_session_hash(unicode, None): hash used with the peer 973 @param peer_session_hash(unicode, None): hash used with the peer
974 None to use session_hash. 974 None to use session_hash.
975 None must be used in 2 cases: 975 None must be used in 2 cases:
976 - when XEP-0065 is used with XEP-0096 976 - when XEP-0065 is used with XEP-0096
977 - when a peer connect to a proxy *he proposed himself* 977 - when a peer connect to a proxy *he proposed himself*
978 in practice, peer_session_hash is only used by tryCandidates 978 in practice, peer_session_hash is only used by try_candidates
979 @param delay(None, float): optional delay to wait before connection, in seconds 979 @param delay(None, float): optional delay to wait before connection, in seconds
980 @return (D): Deferred launched when TCP connection + Socks5 connection is done 980 @return (D): Deferred launched when TCP connection + Socks5 connection is done
981 """ 981 """
982 if peer_session_hash is None: 982 if peer_session_hash is None:
983 # for XEP-0065, only one hash is needed 983 # for XEP-0065, only one hash is needed
988 if delay is None: 988 if delay is None:
989 d = defer.succeed(candidate.host) 989 d = defer.succeed(candidate.host)
990 else: 990 else:
991 d = sat_defer.DelayedDeferred(delay, candidate.host) 991 d = sat_defer.DelayedDeferred(delay, candidate.host)
992 d.addCallback(reactor.connectTCP, candidate.port, factory) 992 d.addCallback(reactor.connectTCP, candidate.port, factory)
993 d.addCallback(self._addConnector, candidate) 993 d.addCallback(self._add_connector, candidate)
994 return d 994 return d
995 995
996 def tryCandidates( 996 def try_candidates(
997 self, 997 self,
998 client, 998 client,
999 candidates, 999 candidates,
1000 session_hash, 1000 session_hash,
1001 peer_session_hash, 1001 peer_session_hash,
1006 1006
1007 for candidate in candidates: 1007 for candidate in candidates:
1008 delay = CANDIDATE_DELAY * len(defers_list) 1008 delay = CANDIDATE_DELAY * len(defers_list)
1009 if candidate.type == XEP_0065.TYPE_PROXY: 1009 if candidate.type == XEP_0065.TYPE_PROXY:
1010 delay += CANDIDATE_DELAY_PROXY 1010 delay += CANDIDATE_DELAY_PROXY
1011 d = self.connectCandidate( 1011 d = self.connect_candidate(
1012 client, candidate, session_hash, peer_session_hash, delay 1012 client, candidate, session_hash, peer_session_hash, delay
1013 ) 1013 )
1014 if connection_cb is not None: 1014 if connection_cb is not None:
1015 d.addCallback( 1015 d.addCallback(
1016 lambda __, candidate=candidate, client=client: connection_cb( 1016 lambda __, candidate=candidate, client=client: connection_cb(
1021 d.addErrback(connection_eb, client, candidate) 1021 d.addErrback(connection_eb, client, candidate)
1022 defers_list.append(d) 1022 defers_list.append(d)
1023 1023
1024 return defers_list 1024 return defers_list
1025 1025
1026 def getBestCandidate(self, client, candidates, session_hash, peer_session_hash=None): 1026 def get_best_candidate(self, client, candidates, session_hash, peer_session_hash=None):
1027 """Get best candidate (according to priority) which can connect 1027 """Get best candidate (according to priority) which can connect
1028 1028
1029 @param candidates(iterable[Candidate]): candidates to test 1029 @param candidates(iterable[Candidate]): candidates to test
1030 @param session_hash(unicode): hash of the session 1030 @param session_hash(unicode): hash of the session
1031 hash is the same as hostname computed in XEP-0065 § 5.3.2 #1 1031 hash is the same as hostname computed in XEP-0065 § 5.3.2 #1
1033 only useful for XEP-0260, must be None for XEP-0065 streamhost candidates 1033 only useful for XEP-0260, must be None for XEP-0065 streamhost candidates
1034 @return (D(None, Candidate)): best candidate or None if none can connect 1034 @return (D(None, Candidate)): best candidate or None if none can connect
1035 """ 1035 """
1036 defer_candidates = None 1036 defer_candidates = None
1037 1037
1038 def connectionCb(client, candidate): 1038 def connection_cb(client, candidate):
1039 log.info("Connection of {} successful".format(str(candidate))) 1039 log.info("Connection of {} successful".format(str(candidate)))
1040 for idx, other_candidate in enumerate(candidates): 1040 for idx, other_candidate in enumerate(candidates):
1041 try: 1041 try:
1042 if other_candidate.priority < candidate.priority: 1042 if other_candidate.priority < candidate.priority:
1043 log.debug("Cancelling {}".format(other_candidate)) 1043 log.debug("Cancelling {}".format(other_candidate))
1044 defer_candidates[idx].cancel() 1044 defer_candidates[idx].cancel()
1045 except AttributeError: 1045 except AttributeError:
1046 assert other_candidate is None 1046 assert other_candidate is None
1047 1047
1048 def connectionEb(failure, client, candidate): 1048 def connection_eb(failure, client, candidate):
1049 if failure.check(defer.CancelledError): 1049 if failure.check(defer.CancelledError):
1050 log.debug("Connection of {} has been cancelled".format(candidate)) 1050 log.debug("Connection of {} has been cancelled".format(candidate))
1051 else: 1051 else:
1052 log.info( 1052 log.info(
1053 "Connection of {candidate} Failed: {error}".format( 1053 "Connection of {candidate} Failed: {error}".format(
1054 candidate=candidate, error=failure.value 1054 candidate=candidate, error=failure.value
1055 ) 1055 )
1056 ) 1056 )
1057 candidates[candidates.index(candidate)] = None 1057 candidates[candidates.index(candidate)] = None
1058 1058
1059 def allTested(__): 1059 def all_tested(__):
1060 log.debug("All candidates have been tested") 1060 log.debug("All candidates have been tested")
1061 good_candidates = [c for c in candidates if c] 1061 good_candidates = [c for c in candidates if c]
1062 return good_candidates[0] if good_candidates else None 1062 return good_candidates[0] if good_candidates else None
1063 1063
1064 defer_candidates = self.tryCandidates( 1064 defer_candidates = self.try_candidates(
1065 client, 1065 client,
1066 candidates, 1066 candidates,
1067 session_hash, 1067 session_hash,
1068 peer_session_hash, 1068 peer_session_hash,
1069 connectionCb, 1069 connection_cb,
1070 connectionEb, 1070 connection_eb,
1071 ) 1071 )
1072 d_list = defer.DeferredList(defer_candidates) 1072 d_list = defer.DeferredList(defer_candidates)
1073 d_list.addCallback(allTested) 1073 d_list.addCallback(all_tested)
1074 return d_list 1074 return d_list
1075 1075
1076 def _timeOut(self, session_hash, client): 1076 def _time_out(self, session_hash, client):
1077 """Called when stream was not started quickly enough 1077 """Called when stream was not started quickly enough
1078 1078
1079 @param session_hash(str): hash as returned by getSessionHash 1079 @param session_hash(str): hash as returned by get_session_hash
1080 @param client: %(doc_client)s 1080 @param client: %(doc_client)s
1081 """ 1081 """
1082 log.info("Socks5 Bytestream: TimeOut reached") 1082 log.info("Socks5 Bytestream: TimeOut reached")
1083 session = self.getSession(client, session_hash) 1083 session = self.getSession(client, session_hash)
1084 session[DEFER_KEY].errback(exceptions.TimeOutError()) 1084 session[DEFER_KEY].errback(exceptions.TimeOutError())
1085 1085
1086 def killSession(self, failure_, session_hash, sid, client): 1086 def kill_session(self, failure_, session_hash, sid, client):
1087 """Clean the current session 1087 """Clean the current session
1088 1088
1089 @param session_hash(str): hash as returned by getSessionHash 1089 @param session_hash(str): hash as returned by get_session_hash
1090 @param sid(None, unicode): session id 1090 @param sid(None, unicode): session id
1091 or None if self.xep_0065_sid_session was not used 1091 or None if self.xep_0065_sid_session was not used
1092 @param client: %(doc_client)s 1092 @param client: %(doc_client)s
1093 @param failure_(None, failure.Failure): None if eveything was fine, a failure else 1093 @param failure_(None, failure.Failure): None if eveything was fine, a failure else
1094 @return (None, failure.Failure): failure_ is returned 1094 @return (None, failure.Failure): failure_ is returned
1126 except (internet_error.AlreadyCalled, internet_error.AlreadyCancelled): 1126 except (internet_error.AlreadyCalled, internet_error.AlreadyCancelled):
1127 pass 1127 pass
1128 1128
1129 return failure_ 1129 return failure_
1130 1130
1131 def startStream(self, client, stream_object, local_jid, to_jid, sid): 1131 def start_stream(self, client, stream_object, local_jid, to_jid, sid):
1132 """Launch the stream workflow 1132 """Launch the stream workflow
1133 1133
1134 @param streamProducer: stream_object to use 1134 @param streamProducer: stream_object to use
1135 @param local_jid(jid.JID): same as for [getCandidates] 1135 @param local_jid(jid.JID): same as for [get_candidates]
1136 @param to_jid: JID of the recipient 1136 @param to_jid: JID of the recipient
1137 @param sid: Stream session id 1137 @param sid: Stream session id
1138 @param successCb: method to call when stream successfuly finished 1138 @param successCb: method to call when stream successfuly finished
1139 @param failureCb: method to call when something goes wrong 1139 @param failureCb: method to call when something goes wrong
1140 @return (D): Deferred fired when session is finished 1140 @return (D): Deferred fired when session is finished
1141 """ 1141 """
1142 session_data = self._createSession( 1142 session_data = self._create_session(
1143 client, stream_object, local_jid, to_jid, sid, True) 1143 client, stream_object, local_jid, to_jid, sid, True)
1144 1144
1145 session_data[client] = client 1145 session_data[client] = client
1146 1146
1147 def gotCandidates(candidates): 1147 def got_candidates(candidates):
1148 session_data["candidates"] = candidates 1148 session_data["candidates"] = candidates
1149 iq_elt = client.IQ() 1149 iq_elt = client.IQ()
1150 iq_elt["from"] = local_jid.full() 1150 iq_elt["from"] = local_jid.full()
1151 iq_elt["to"] = to_jid.full() 1151 iq_elt["to"] = to_jid.full()
1152 query_elt = iq_elt.addElement((NS_BS, "query")) 1152 query_elt = iq_elt.addElement((NS_BS, "query"))
1160 streamhost["jid"] = candidate.jid.full() 1160 streamhost["jid"] = candidate.jid.full()
1161 log.debug("Candidate proposed: {}".format(candidate)) 1161 log.debug("Candidate proposed: {}".format(candidate))
1162 1162
1163 d = iq_elt.send() 1163 d = iq_elt.send()
1164 args = [client, session_data, local_jid] 1164 args = [client, session_data, local_jid]
1165 d.addCallbacks(self._IQNegotiationCb, self._IQNegotiationEb, args, None, args) 1165 d.addCallbacks(self._iq_negotiation_cb, self._iq_negotiation_eb, args, None, args)
1166 1166
1167 self.getCandidates(client, local_jid).addCallback(gotCandidates) 1167 self.get_candidates(client, local_jid).addCallback(got_candidates)
1168 return session_data[DEFER_KEY] 1168 return session_data[DEFER_KEY]
1169 1169
1170 def _IQNegotiationCb(self, iq_elt, client, session_data, local_jid): 1170 def _iq_negotiation_cb(self, iq_elt, client, session_data, local_jid):
1171 """Called when the result of open iq is received 1171 """Called when the result of open iq is received
1172 1172
1173 @param session_data(dict): data of the session 1173 @param session_data(dict): data of the session
1174 @param client: %(doc_client)s 1174 @param client: %(doc_client)s
1175 @param iq_elt(domish.Element): <iq> result 1175 @param iq_elt(domish.Element): <iq> result
1195 else: 1195 else:
1196 log.info("Candidate choosed by target: {}".format(candidate)) 1196 log.info("Candidate choosed by target: {}".format(candidate))
1197 1197
1198 if candidate.type == XEP_0065.TYPE_PROXY: 1198 if candidate.type == XEP_0065.TYPE_PROXY:
1199 log.info("A Socks5 proxy is used") 1199 log.info("A Socks5 proxy is used")
1200 d = self.connectCandidate(client, candidate, session_data["hash"]) 1200 d = self.connect_candidate(client, candidate, session_data["hash"])
1201 d.addCallback( 1201 d.addCallback(
1202 lambda __: candidate.activate( 1202 lambda __: candidate.activate(
1203 client, session_data["id"], session_data["peer_jid"], local_jid 1203 client, session_data["id"], session_data["peer_jid"], local_jid
1204 ) 1204 )
1205 ) 1205 )
1206 d.addErrback(self._activationEb) 1206 d.addErrback(self._activation_eb)
1207 else: 1207 else:
1208 d = defer.succeed(None) 1208 d = defer.succeed(None)
1209 1209
1210 d.addCallback(lambda __: candidate.startTransfer(session_data["hash"])) 1210 d.addCallback(lambda __: candidate.start_transfer(session_data["hash"]))
1211 1211
1212 def _activationEb(self, failure): 1212 def _activation_eb(self, failure):
1213 log.warning("Proxy activation error: {}".format(failure.value)) 1213 log.warning("Proxy activation error: {}".format(failure.value))
1214 1214
1215 def _IQNegotiationEb(self, stanza_err, client, session_data, local_jid): 1215 def _iq_negotiation_eb(self, stanza_err, client, session_data, local_jid):
1216 log.warning("Socks5 transfer failed: {}".format(stanza_err.value)) 1216 log.warning("Socks5 transfer failed: {}".format(stanza_err.value))
1217 # FIXME: must clean session 1217 # FIXME: must clean session
1218 1218
1219 def createSession(self, *args, **kwargs): 1219 def create_session(self, *args, **kwargs):
1220 """like [_createSession] but return the session deferred instead of the whole session 1220 """like [_create_session] but return the session deferred instead of the whole session
1221 1221
1222 session deferred is fired when transfer is finished 1222 session deferred is fired when transfer is finished
1223 """ 1223 """
1224 return self._createSession(*args, **kwargs)[DEFER_KEY] 1224 return self._create_session(*args, **kwargs)[DEFER_KEY]
1225 1225
1226 def _createSession(self, client, stream_object, local_jid, to_jid, sid, 1226 def _create_session(self, client, stream_object, local_jid, to_jid, sid,
1227 requester=False): 1227 requester=False):
1228 """Called when a bytestream is imminent 1228 """Called when a bytestream is imminent
1229 1229
1230 @param stream_object(iface.IStreamProducer): File object where data will be 1230 @param stream_object(iface.IStreamProducer): File object where data will be
1231 written 1231 written
1235 @return (dict): session data 1235 @return (dict): session data
1236 """ 1236 """
1237 if sid in client.xep_0065_sid_session: 1237 if sid in client.xep_0065_sid_session:
1238 raise exceptions.ConflictError("A session with this id already exists !") 1238 raise exceptions.ConflictError("A session with this id already exists !")
1239 if requester: 1239 if requester:
1240 session_hash = getSessionHash(local_jid, to_jid, sid) 1240 session_hash = get_session_hash(local_jid, to_jid, sid)
1241 session_data = self._registerHash(client, session_hash, stream_object) 1241 session_data = self._register_hash(client, session_hash, stream_object)
1242 else: 1242 else:
1243 session_hash = getSessionHash(to_jid, local_jid, sid) 1243 session_hash = get_session_hash(to_jid, local_jid, sid)
1244 session_d = defer.Deferred() 1244 session_d = defer.Deferred()
1245 session_d.addBoth(self.killSession, session_hash, sid, client) 1245 session_d.addBoth(self.kill_session, session_hash, sid, client)
1246 session_data = client._s5b_sessions[session_hash] = { 1246 session_data = client._s5b_sessions[session_hash] = {
1247 DEFER_KEY: session_d, 1247 DEFER_KEY: session_d,
1248 TIMER_KEY: reactor.callLater( 1248 TIMER_KEY: reactor.callLater(
1249 TIMEOUT, self._timeOut, session_hash, client 1249 TIMEOUT, self._time_out, session_hash, client
1250 ), 1250 ),
1251 } 1251 }
1252 client.xep_0065_sid_session[sid] = session_data 1252 client.xep_0065_sid_session[sid] = session_data
1253 session_data.update( 1253 session_data.update(
1254 { 1254 {
1281 except KeyError as e: 1281 except KeyError as e:
1282 log.warning("The requested session doesn't exists !") 1282 log.warning("The requested session doesn't exists !")
1283 raise e 1283 raise e
1284 return client._s5b_sessions[session_hash] 1284 return client._s5b_sessions[session_hash]
1285 1285
1286 def registerHash(self, *args, **kwargs): 1286 def register_hash(self, *args, **kwargs):
1287 """like [_registerHash] but return the session deferred instead of the whole session 1287 """like [_register_hash] but return the session deferred instead of the whole session
1288 session deferred is fired when transfer is finished 1288 session deferred is fired when transfer is finished
1289 """ 1289 """
1290 return self._registerHash(*args, **kwargs)[DEFER_KEY] 1290 return self._register_hash(*args, **kwargs)[DEFER_KEY]
1291 1291
1292 def _registerHash(self, client, session_hash, stream_object): 1292 def _register_hash(self, client, session_hash, stream_object):
1293 """Create a session_data associated to hash 1293 """Create a session_data associated to hash
1294 1294
1295 @param session_hash(str): hash of the session 1295 @param session_hash(str): hash of the session
1296 @param stream_object(iface.IStreamProducer, IConsumer, None): file-like object 1296 @param stream_object(iface.IStreamProducer, IConsumer, None): file-like object
1297 None if it will be filled later 1297 None if it will be filled later
1298 return (dict): session data 1298 return (dict): session data
1299 """ 1299 """
1300 assert session_hash not in client._s5b_sessions 1300 assert session_hash not in client._s5b_sessions
1301 session_d = defer.Deferred() 1301 session_d = defer.Deferred()
1302 session_d.addBoth(self.killSession, session_hash, None, client) 1302 session_d.addBoth(self.kill_session, session_hash, None, client)
1303 session_data = client._s5b_sessions[session_hash] = { 1303 session_data = client._s5b_sessions[session_hash] = {
1304 DEFER_KEY: session_d, 1304 DEFER_KEY: session_d,
1305 TIMER_KEY: reactor.callLater(TIMEOUT, self._timeOut, session_hash, client), 1305 TIMER_KEY: reactor.callLater(TIMEOUT, self._time_out, session_hash, client),
1306 } 1306 }
1307 1307
1308 if stream_object is not None: 1308 if stream_object is not None:
1309 session_data["stream_object"] = stream_object 1309 session_data["stream_object"] = stream_object
1310 1310
1311 assert session_hash not in self.hash_clients_map 1311 assert session_hash not in self.hash_clients_map
1312 self.hash_clients_map[session_hash] = client 1312 self.hash_clients_map[session_hash] = client
1313 1313
1314 return session_data 1314 return session_data
1315 1315
1316 def associateStreamObject(self, client, session_hash, stream_object): 1316 def associate_stream_object(self, client, session_hash, stream_object):
1317 """Associate a stream object with a session""" 1317 """Associate a stream object with a session"""
1318 session_data = self.getSession(client, session_hash) 1318 session_data = self.getSession(client, session_hash)
1319 assert "stream_object" not in session_data 1319 assert "stream_object" not in session_data
1320 session_data["stream_object"] = stream_object 1320 session_data["stream_object"] = stream_object
1321 1321
1322 def streamQuery(self, iq_elt, client): 1322 def stream_query(self, iq_elt, client):
1323 log.debug("BS stream query") 1323 log.debug("BS stream query")
1324 1324
1325 iq_elt.handled = True 1325 iq_elt.handled = True
1326 1326
1327 query_elt = next(iq_elt.elements(NS_BS, "query")) 1327 query_elt = next(iq_elt.elements(NS_BS, "query"))
1359 candidates.append(Candidate(host, port, type_, priority, jid_)) 1359 candidates.append(Candidate(host, port, type_, priority, jid_))
1360 1360
1361 for candidate in candidates: 1361 for candidate in candidates:
1362 log.info("Candidate proposed: {}".format(candidate)) 1362 log.info("Candidate proposed: {}".format(candidate))
1363 1363
1364 d = self.getBestCandidate(client, candidates, session_data["hash"]) 1364 d = self.get_best_candidate(client, candidates, session_data["hash"])
1365 d.addCallback(self._ackStream, iq_elt, session_data, client) 1365 d.addCallback(self._ack_stream, iq_elt, session_data, client)
1366 1366
1367 def _ackStream(self, candidate, iq_elt, session_data, client): 1367 def _ack_stream(self, candidate, iq_elt, session_data, client):
1368 if candidate is None: 1368 if candidate is None:
1369 log.info("No streamhost candidate worked, we have to end negotiation") 1369 log.info("No streamhost candidate worked, we have to end negotiation")
1370 return client.sendError(iq_elt, "item-not-found") 1370 return client.sendError(iq_elt, "item-not-found")
1371 log.info("We choose: {}".format(candidate)) 1371 log.info("We choose: {}".format(candidate))
1372 result_elt = xmlstream.toResponse(iq_elt, "result") 1372 result_elt = xmlstream.toResponse(iq_elt, "result")
1384 self.plugin_parent = plugin_parent 1384 self.plugin_parent = plugin_parent
1385 self.host = plugin_parent.host 1385 self.host = plugin_parent.host
1386 1386
1387 def connectionInitialized(self): 1387 def connectionInitialized(self):
1388 self.xmlstream.addObserver( 1388 self.xmlstream.addObserver(
1389 BS_REQUEST, self.plugin_parent.streamQuery, client=self.parent 1389 BS_REQUEST, self.plugin_parent.stream_query, client=self.parent
1390 ) 1390 )
1391 1391
1392 def getDiscoInfo(self, requestor, target, nodeIdentifier=""): 1392 def getDiscoInfo(self, requestor, target, nodeIdentifier=""):
1393 return [disco.DiscoFeature(NS_BS)] 1393 return [disco.DiscoFeature(NS_BS)]
1394 1394