comparison sat/plugins/plugin_xep_0065.py @ 3028:ab2696e34d29

Python 3 port: /!\ this is a huge commit /!\ starting from this commit, SàT is needs Python 3.6+ /!\ SàT maybe be instable or some feature may not work anymore, this will improve with time This patch port backend, bridge and frontends to Python 3. Roughly this has been done this way: - 2to3 tools has been applied (with python 3.7) - all references to python2 have been replaced with python3 (notably shebangs) - fixed files not handled by 2to3 (notably the shell script) - several manual fixes - fixed issues reported by Python 3 that where not handled in Python 2 - replaced "async" with "async_" when needed (it's a reserved word from Python 3.7) - replaced zope's "implements" with @implementer decorator - temporary hack to handle data pickled in database, as str or bytes may be returned, to be checked later - fixed hash comparison for password - removed some code which is not needed anymore with Python 3 - deactivated some code which needs to be checked (notably certificate validation) - tested with jp, fixed reported issues until some basic commands worked - ported Primitivus (after porting dependencies like urwid satext) - more manual fixes
author Goffi <goffi@goffi.org>
date Tue, 13 Aug 2019 19:08:41 +0200
parents 69e4716d6268
children fee60f17ebac
comparison
equal deleted inserted replaced
3027:ff5bcb12ae60 3028:ab2696e34d29
1 #!/usr/bin/env python2 1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*- 2 # -*- coding: utf-8 -*-
3 3
4 # SAT plugin for managing xep-0065 4 # SAT plugin for managing xep-0065
5 5
6 # Copyright (C) 6 # Copyright (C)
71 from collections import namedtuple 71 from collections import namedtuple
72 import struct 72 import struct
73 import hashlib 73 import hashlib
74 import uuid 74 import uuid
75 75
76 from zope.interface import implements 76 from zope.interface import implementer
77 77
78 try: 78 try:
79 from twisted.words.protocols.xmlstream import XMPPHandler 79 from twisted.words.protocols.xmlstream import XMPPHandler
80 except ImportError: 80 except ImportError:
81 from wokkel.subprotocols import XMPPHandler 81 from wokkel.subprotocols import XMPPHandler
142 STATE_READY, 142 STATE_READY,
143 STATE_AUTH_USERPASS, 143 STATE_AUTH_USERPASS,
144 STATE_CLIENT_INITIAL, 144 STATE_CLIENT_INITIAL,
145 STATE_CLIENT_AUTH, 145 STATE_CLIENT_AUTH,
146 STATE_CLIENT_REQUEST, 146 STATE_CLIENT_REQUEST,
147 ) = xrange(8) 147 ) = range(8)
148 148
149 SOCKS5_VER = 0x05 149 SOCKS5_VER = 0x05
150 150
151 ADDR_IPV4 = 0x01 151 ADDR_IPV4 = 0x01
152 ADDR_DOMAINNAME = 0x03 152 ADDR_DOMAINNAME = 0x03
196 @param priority_local(bool): if True, priority is used as local priority, 196 @param priority_local(bool): if True, priority is used as local priority,
197 else priority is used as global one (and local priority is set to 0) 197 else priority is used as global one (and local priority is set to 0)
198 """ 198 """
199 assert isinstance(jid_, jid.JID) 199 assert isinstance(jid_, jid.JID)
200 self.host, self.port, self.type, self.jid = (host, int(port), type_, jid_) 200 self.host, self.port, self.type, self.jid = (host, int(port), type_, jid_)
201 self.id = id_ if id_ is not None else unicode(uuid.uuid4()) 201 self.id = id_ if id_ is not None else str(uuid.uuid4())
202 if priority_local: 202 if priority_local:
203 self._local_priority = int(priority) 203 self._local_priority = int(priority)
204 self._priority = self.calculatePriority() 204 self._priority = self.calculatePriority()
205 else: 205 else:
206 self._local_priority = 0 206 self._local_priority = 0
210 def discard(self): 210 def discard(self):
211 """Disconnect a candidate if it is connected 211 """Disconnect a candidate if it is connected
212 212
213 Used to disconnect tryed client when they are discarded 213 Used to disconnect tryed client when they are discarded
214 """ 214 """
215 log.debug(u"Discarding {}".format(self)) 215 log.debug("Discarding {}".format(self))
216 try: 216 try:
217 self.factory.discard() 217 self.factory.discard()
218 except AttributeError: 218 except AttributeError:
219 pass # no discard for Socks5ServerFactory 219 pass # no discard for Socks5ServerFactory
220 220
228 228
229 def __str__(self): 229 def __str__(self):
230 # similar to __unicode__ but we don't show jid and we encode id 230 # similar to __unicode__ but we don't show jid and we encode id
231 return "Candidate ({0.priority}): host={0.host} port={0.port} type={0.type}{id}".format( 231 return "Candidate ({0.priority}): host={0.host} port={0.port} type={0.type}{id}".format(
232 self, 232 self,
233 id=u" id={}".format(self.id if self.id is not None else u"").encode( 233 id=" id={}".format(self.id if self.id is not None else "").encode(
234 "utf-8", "ignore" 234 "utf-8", "ignore"
235 ), 235 ),
236 ) 236 )
237 237
238 def __unicode__(self): 238 def __unicode__(self):
239 return u"Candidate ({0.priority}): host={0.host} port={0.port} jid={0.jid} type={0.type}{id}".format( 239 return "Candidate ({0.priority}): host={0.host} port={0.port} jid={0.jid} type={0.type}{id}".format(
240 self, id=u" id={}".format(self.id if self.id is not None else u"") 240 self, id=" id={}".format(self.id if self.id is not None else "")
241 ) 241 )
242 242
243 def __eq__(self, other): 243 def __eq__(self, other):
244 # self.id is is not used in __eq__ as the same candidate can have 244 # self.id is is not used in __eq__ as the same candidate can have
245 # different ids if proposed by initiator or responder 245 # different ids if proposed by initiator or responder
268 elif self.type == XEP_0065.TYPE_TUNEL: 268 elif self.type == XEP_0065.TYPE_TUNEL:
269 multiplier = 110 269 multiplier = 110
270 elif self.type == XEP_0065.TYPE_PROXY: 270 elif self.type == XEP_0065.TYPE_PROXY:
271 multiplier = 10 271 multiplier = 10
272 else: 272 else:
273 raise exceptions.InternalError(u"Unknown {} type !".format(self.type)) 273 raise exceptions.InternalError("Unknown {} type !".format(self.type))
274 return 2 ** 16 * multiplier + self._local_priority 274 return 2 ** 16 * multiplier + self._local_priority
275 275
276 def activate(self, client, sid, peer_jid, local_jid): 276 def activate(self, client, sid, peer_jid, local_jid):
277 """Activate the proxy candidate 277 """Activate the proxy candidate
278 278
320 @param session_hash(str): hash of the session 320 @param session_hash(str): hash of the session
321 must only be used in client mode 321 must only be used in client mode
322 """ 322 """
323 self.connection = defer.Deferred() # called when connection/auth is done 323 self.connection = defer.Deferred() # called when connection/auth is done
324 if session_hash is not None: 324 if session_hash is not None:
325 assert isinstance(session_hash, str)
325 self.server_mode = False 326 self.server_mode = False
326 self._session_hash = session_hash 327 self._session_hash = session_hash
327 self.state = STATE_CLIENT_INITIAL 328 self.state = STATE_CLIENT_INITIAL
328 else: 329 else:
329 self.server_mode = True 330 self.server_mode = True
330 self.state = STATE_INITIAL 331 self.state = STATE_INITIAL
331 self.buf = "" 332 self.buf = b""
332 self.supportedAuthMechs = [AUTHMECH_ANON] 333 self.supportedAuthMechs = [AUTHMECH_ANON]
333 self.supportedAddrs = [ADDR_DOMAINNAME] 334 self.supportedAddrs = [ADDR_DOMAINNAME]
334 self.enabledCommands = [CMD_CONNECT] 335 self.enabledCommands = [CMD_CONNECT]
335 self.peersock = None 336 self.peersock = None
336 self.addressType = 0 337 self.addressType = 0
388 # Complete negotiation w/ this method 389 # Complete negotiation w/ this method
389 self.transport.write(struct.pack("!BB", SOCKS5_VER, m)) 390 self.transport.write(struct.pack("!BB", SOCKS5_VER, m))
390 return 391 return
391 392
392 # No supported mechs found, notify client and close the connection 393 # No supported mechs found, notify client and close the connection
393 log.warning(u"Unsupported authentication mechanism") 394 log.warning("Unsupported authentication mechanism")
394 self.transport.write(struct.pack("!BB", SOCKS5_VER, AUTHMECH_INVALID)) 395 self.transport.write(struct.pack("!BB", SOCKS5_VER, AUTHMECH_INVALID))
395 self.transport.loseConnection() 396 self.transport.loseConnection()
396 except struct.error: 397 except struct.error:
397 pass 398 pass
398 399
436 # Deal with addresses 437 # Deal with addresses
437 if self.addressType == ADDR_IPV4: 438 if self.addressType == ADDR_IPV4:
438 addr, port = struct.unpack("!IH", self.buf[4:10]) 439 addr, port = struct.unpack("!IH", self.buf[4:10])
439 self.buf = self.buf[10:] 440 self.buf = self.buf[10:]
440 elif self.addressType == ADDR_DOMAINNAME: 441 elif self.addressType == ADDR_DOMAINNAME:
441 nlen = ord(self.buf[4]) 442 nlen = self.buf[4]
442 addr, port = struct.unpack("!%dsH" % nlen, self.buf[5:]) 443 addr, port = struct.unpack("!%dsH" % nlen, self.buf[5:])
443 self.buf = self.buf[7 + len(addr) :] 444 self.buf = self.buf[7 + len(addr) :]
444 else: 445 else:
445 # Any other address types are not supported 446 # Any other address types are not supported
446 self.sendErrorReply(REPLY_ADDR_NOT_SUPPORTED) 447 self.sendErrorReply(REPLY_ADDR_NOT_SUPPORTED)
464 except struct.error: 465 except struct.error:
465 # The buffer is probably not complete, we need to wait more 466 # The buffer is probably not complete, we need to wait more
466 return None 467 return None
467 468
468 def _makeRequest(self): 469 def _makeRequest(self):
469 hash_ = self._session_hash 470 hash_ = self._session_hash.encode('utf-8')
470 request = struct.pack( 471 request = struct.pack(
471 "!5B%dsH" % len(hash_), 472 "!5B%dsH" % len(hash_),
472 SOCKS5_VER, 473 SOCKS5_VER,
473 CMD_CONNECT, 474 CMD_CONNECT,
474 0, 475 0,
491 # Deal with addresses 492 # Deal with addresses
492 if self.addressType == ADDR_IPV4: 493 if self.addressType == ADDR_IPV4:
493 addr, port = struct.unpack("!IH", self.buf[4:10]) 494 addr, port = struct.unpack("!IH", self.buf[4:10])
494 self.buf = self.buf[10:] 495 self.buf = self.buf[10:]
495 elif self.addressType == ADDR_DOMAINNAME: 496 elif self.addressType == ADDR_DOMAINNAME:
496 nlen = ord(self.buf[4]) 497 nlen = self.buf[4]
497 addr, port = struct.unpack("!%dsH" % nlen, self.buf[5:]) 498 addr, port = struct.unpack("!%dsH" % nlen, self.buf[5:])
498 self.buf = self.buf[7 + len(addr) :] 499 self.buf = self.buf[7 + len(addr) :]
499 else: 500 else:
500 # Any other address types are not supported 501 # Any other address types are not supported
501 self.sendErrorReply(REPLY_ADDR_NOT_SUPPORTED) 502 self.sendErrorReply(REPLY_ADDR_NOT_SUPPORTED)
513 # The buffer is probably not complete, we need to wait more 514 # The buffer is probably not complete, we need to wait more
514 return None 515 return None
515 516
516 def connectionMade(self): 517 def connectionMade(self):
517 log.debug( 518 log.debug(
518 u"Socks5 connectionMade (mode = {})".format( 519 "Socks5 connectionMade (mode = {})".format(
519 "server" if self.state == STATE_INITIAL else "client" 520 "server" if self.state == STATE_INITIAL else "client"
520 ) 521 )
521 ) 522 )
522 if self.state == STATE_CLIENT_INITIAL: 523 if self.state == STATE_CLIENT_INITIAL:
523 self._startNegotiation() 524 self._startNegotiation()
524 525
525 def connectRequested(self, addr, port): 526 def connectRequested(self, addr, port):
526 # Check that this session is expected 527 # Check that this session is expected
527 if not self.factory.addToSession(addr, self): 528 if not self.factory.addToSession(addr.decode('utf-8'), self):
528 self.sendErrorReply(REPLY_CONN_REFUSED) 529 self.sendErrorReply(REPLY_CONN_REFUSED)
529 log.warning( 530 log.warning(
530 u"Unexpected connection request received from {host}".format( 531 "Unexpected connection request received from {host}".format(
531 host=self.transport.getPeer().host 532 host=self.transport.getPeer().host
532 ) 533 )
533 ) 534 )
534 return 535 return
535 self._session_hash = addr 536 self._session_hash = addr.decode('utf-8')
536 self.connectCompleted(addr, 0) 537 self.connectCompleted(addr, 0)
537 538
538 def startTransfer(self, chunk_size): 539 def startTransfer(self, chunk_size):
539 """Callback called when the result iq is received 540 """Callback called when the result iq is received
540 541
541 @param chunk_size(None, int): size of the buffer, or None for default 542 @param chunk_size(None, int): size of the buffer, or None for default
542 """ 543 """
543 self.active = True 544 self.active = True
544 if chunk_size is not None: 545 if chunk_size is not None:
545 self.CHUNK_SIZE = chunk_size 546 self.CHUNK_SIZE = chunk_size
546 log.debug(u"Starting file transfer") 547 log.debug("Starting file transfer")
547 d = self.stream_object.startStream(self.transport) 548 d = self.stream_object.startStream(self.transport)
548 d.addCallback(self.streamFinished) 549 d.addCallback(self.streamFinished)
549 550
550 def streamFinished(self, d): 551 def streamFinished(self, d):
551 log.info(_("File transfer completed, closing connection")) 552 log.info(_("File transfer completed, closing connection"))
573 def bindRequested(self, addr, port): 574 def bindRequested(self, addr, port):
574 pass 575 pass
575 576
576 def authenticateUserPass(self, user, passwd): 577 def authenticateUserPass(self, user, passwd):
577 # FIXME: implement authentication and remove the debug printing a password 578 # FIXME: implement authentication and remove the debug printing a password
578 log.debug(u"User/pass: %s/%s" % (user, passwd)) 579 log.debug("User/pass: %s/%s" % (user, passwd))
579 return True 580 return True
580 581
581 def dataReceived(self, buf): 582 def dataReceived(self, buf):
582 if self.state == STATE_READY: 583 if self.state == STATE_READY:
583 # Everything is set, we just have to write the incoming data 584 # Everything is set, we just have to write the incoming data
603 self.transport.loseConnection() 604 self.transport.loseConnection()
604 else: 605 else:
605 self._makeRequest() 606 self._makeRequest()
606 607
607 def connectionLost(self, reason): 608 def connectionLost(self, reason):
608 log.debug(u"Socks5 connection lost: {}".format(reason.value)) 609 log.debug("Socks5 connection lost: {}".format(reason.value))
609 if self.state != STATE_READY: 610 if self.state != STATE_READY:
610 self.connection.errback(reason) 611 self.connection.errback(reason)
611 if self.server_mode: 612 if self.server_mode:
612 self.factory.removeFromSession(self._session_hash, self, reason) 613 self.factory.removeFromSession(self._session_hash, self, reason)
613 614
627 def startTransfer(self, session_hash, chunk_size=None): 628 def startTransfer(self, session_hash, chunk_size=None):
628 session = self.getSession(session_hash) 629 session = self.getSession(session_hash)
629 try: 630 try:
630 protocol = session["protocols"][0] 631 protocol = session["protocols"][0]
631 except (KeyError, IndexError): 632 except (KeyError, IndexError):
632 log.error(u"Can't start file transfer, can't find protocol") 633 log.error("Can't start file transfer, can't find protocol")
633 else: 634 else:
634 session[TIMER_KEY].cancel() 635 session[TIMER_KEY].cancel()
635 protocol.startTransfer(chunk_size) 636 protocol.startTransfer(chunk_size)
636 637
637 def addToSession(self, session_hash, protocol): 638 def addToSession(self, session_hash, protocol):
640 the session will be associated to the corresponding candidate 641 the session will be associated to the corresponding candidate
641 @param session_hash(str): hash of the session 642 @param session_hash(str): hash of the session
642 @param protocol(SOCKSv5): protocol instance 643 @param protocol(SOCKSv5): protocol instance
643 @param return(bool): True if hash was valid (i.e. expected), False else 644 @param return(bool): True if hash was valid (i.e. expected), False else
644 """ 645 """
646 assert isinstance(session_hash, str)
645 try: 647 try:
646 session_data = self.getSession(session_hash) 648 session_data = self.getSession(session_hash)
647 except KeyError: 649 except KeyError:
648 return False 650 return False
649 else: 651 else:
661 """ 663 """
662 try: 664 try:
663 protocols = self.getSession(session_hash)["protocols"] 665 protocols = self.getSession(session_hash)["protocols"]
664 protocols.remove(protocol) 666 protocols.remove(protocol)
665 except (KeyError, ValueError): 667 except (KeyError, ValueError):
666 log.error(u"Protocol not found in session while it should be there") 668 log.error("Protocol not found in session while it should be there")
667 else: 669 else:
668 if protocol.active: 670 if protocol.active:
669 # The active protocol has been removed, session is finished 671 # The active protocol has been removed, session is finished
670 if reason.check(internet_error.ConnectionDone): 672 if reason.check(internet_error.ConnectionDone):
671 self.getSession(session_hash)[DEFER_KEY].callback(None) 673 self.getSession(session_hash)[DEFER_KEY].callback(None)
703 def startTransfer(self, __=None, chunk_size=None): 705 def startTransfer(self, __=None, chunk_size=None):
704 self.session[TIMER_KEY].cancel() 706 self.session[TIMER_KEY].cancel()
705 self._protocol_instance.startTransfer(chunk_size) 707 self._protocol_instance.startTransfer(chunk_size)
706 708
707 def clientConnectionFailed(self, connector, reason): 709 def clientConnectionFailed(self, connector, reason):
708 log.debug(u"Connection failed") 710 log.debug("Connection failed")
709 self.connection.errback(reason) 711 self.connection.errback(reason)
710 712
711 def clientConnectionLost(self, connector, reason): 713 def clientConnectionLost(self, connector, reason):
712 log.debug(_(u"Socks 5 client connection lost (reason: %s)") % reason.value) 714 log.debug(_("Socks 5 client connection lost (reason: %s)") % reason.value)
713 if self._protocol_instance.active: 715 if self._protocol_instance.active:
714 # This one was used for the transfer, than mean that 716 # This one was used for the transfer, than mean that
715 # the Socks5 session is finished 717 # the Socks5 session is finished
716 if reason.check(internet_error.ConnectionDone): 718 if reason.check(internet_error.ConnectionDone):
717 self.getSession()[DEFER_KEY].callback(None) 719 self.getSession()[DEFER_KEY].callback(None)
751 # plugins shortcuts 753 # plugins shortcuts
752 self._ip = self.host.plugins["IP"] 754 self._ip = self.host.plugins["IP"]
753 try: 755 try:
754 self._np = self.host.plugins["NAT-PORT"] 756 self._np = self.host.plugins["NAT-PORT"]
755 except KeyError: 757 except KeyError:
756 log.debug(u"NAT Port plugin not available") 758 log.debug("NAT Port plugin not available")
757 self._np = None 759 self._np = None
758 760
759 # parameters 761 # parameters
760 # XXX: params are not used for now, but they may be used in the futur to force proxy/IP 762 # XXX: params are not used for now, but they may be used in the futur to force proxy/IP
761 # host.memory.updateParams(PARAMS) 763 # host.memory.updateParams(PARAMS)
777 self._server_factory_port is set on server creation 779 self._server_factory_port is set on server creation
778 """ 780 """
779 781
780 if self._server_factory is None: 782 if self._server_factory is None:
781 self._server_factory = Socks5ServerFactory(self) 783 self._server_factory = Socks5ServerFactory(self)
782 for port in xrange(SERVER_STARTING_PORT, 65356): 784 for port in range(SERVER_STARTING_PORT, 65356):
783 try: 785 try:
784 listening_port = reactor.listenTCP(port, self._server_factory) 786 listening_port = reactor.listenTCP(port, self._server_factory)
785 except internet_error.CannotListenError as e: 787 except internet_error.CannotListenError as e:
786 log.debug( 788 log.debug(
787 u"Cannot listen on port {port}: {err_msg}{err_num}".format( 789 "Cannot listen on port {port}: {err_msg}{err_num}".format(
788 port=port, 790 port=port,
789 err_msg=e.socketError.strerror, 791 err_msg=e.socketError.strerror,
790 err_num=u" (error code: {})".format(e.socketError.errno), 792 err_num=" (error code: {})".format(e.socketError.errno),
791 ) 793 )
792 ) 794 )
793 else: 795 else:
794 self._server_factory_port = listening_port.getHost().port 796 self._server_factory_port = listening_port.getHost().port
795 break 797 break
811 or None if not acceptable proxy is found 813 or None if not acceptable proxy is found
812 @raise exceptions.NotFound: no Proxy found 814 @raise exceptions.NotFound: no Proxy found
813 """ 815 """
814 816
815 def notFound(server): 817 def notFound(server):
816 log.info(u"No proxy found on this server") 818 log.info("No proxy found on this server")
817 self._cache_proxies[server] = None 819 self._cache_proxies[server] = None
818 raise exceptions.NotFound 820 raise exceptions.NotFound
819 821
820 server = client.host if client.is_component else client.jid.host 822 server = client.host if client.is_component else client.jid.host
821 try: 823 try:
835 837
836 try: 838 try:
837 result_elt = yield iq_elt.send() 839 result_elt = yield iq_elt.send()
838 except jabber_error.StanzaError as failure: 840 except jabber_error.StanzaError as failure:
839 log.warning( 841 log.warning(
840 u"Error while requesting proxy info on {jid}: {error}".format( 842 "Error while requesting proxy info on {jid}: {error}".format(
841 proxy.full(), failure 843 proxy.full(), failure
842 ) 844 )
843 ) 845 )
844 notFound(server) 846 notFound(server)
845 847
846 try: 848 try:
847 query_elt = result_elt.elements(NS_BS, "query").next() 849 query_elt = next(result_elt.elements(NS_BS, "query"))
848 streamhost_elt = query_elt.elements(NS_BS, "streamhost").next() 850 streamhost_elt = next(query_elt.elements(NS_BS, "streamhost"))
849 host = streamhost_elt["host"] 851 host = streamhost_elt["host"]
850 jid_ = streamhost_elt["jid"] 852 jid_ = streamhost_elt["jid"]
851 port = streamhost_elt["port"] 853 port = streamhost_elt["port"]
852 if not all((host, jid, port)): 854 if not all((host, jid, port)):
853 raise KeyError 855 raise KeyError
854 jid_ = jid.JID(jid_) 856 jid_ = jid.JID(jid_)
855 except (StopIteration, KeyError, RuntimeError, jid.InvalidFormat, AttributeError): 857 except (StopIteration, KeyError, RuntimeError, jid.InvalidFormat, AttributeError):
856 log.warning(u"Invalid proxy data received from {}".format(proxy.full())) 858 log.warning("Invalid proxy data received from {}".format(proxy.full()))
857 notFound(server) 859 notFound(server)
858 860
859 proxy_infos = self._cache_proxies[server] = ProxyInfos(host, jid_, port) 861 proxy_infos = self._cache_proxies[server] = ProxyInfos(host, jid_, port)
860 log.info(u"Proxy found: {}".format(proxy_infos)) 862 log.info("Proxy found: {}".format(proxy_infos))
861 defer.returnValue(proxy_infos) 863 defer.returnValue(proxy_infos)
862 864
863 @defer.inlineCallbacks 865 @defer.inlineCallbacks
864 def _getNetworkData(self, client): 866 def _getNetworkData(self, client):
865 """Retrieve information about network 867 """Retrieve information about network
872 external_ip = yield self._ip.getExternalIP(client) 874 external_ip = yield self._ip.getExternalIP(client)
873 local_ips = yield self._ip.getLocalIPs(client) 875 local_ips = yield self._ip.getLocalIPs(client)
874 876
875 if external_ip is not None and self._external_port is None: 877 if external_ip is not None and self._external_port is None:
876 if external_ip != local_ips[0]: 878 if external_ip != local_ips[0]:
877 log.info(u"We are probably behind a NAT") 879 log.info("We are probably behind a NAT")
878 if self._np is None: 880 if self._np is None:
879 log.warning(u"NAT port plugin not available, we can't map port") 881 log.warning("NAT port plugin not available, we can't map port")
880 else: 882 else:
881 ext_port = yield self._np.mapPort( 883 ext_port = yield self._np.mapPort(
882 local_port, desc=u"SaT socks5 stream" 884 local_port, desc="SaT socks5 stream"
883 ) 885 )
884 if ext_port is None: 886 if ext_port is None:
885 log.warning(u"Can't map NAT port") 887 log.warning("Can't map NAT port")
886 else: 888 else:
887 self._external_port = ext_port 889 self._external_port = ext_port
888 890
889 defer.returnValue((local_port, self._external_port, local_ips, external_ip)) 891 defer.returnValue((local_port, self._external_port, local_ips, external_ip))
890 892
1051 @return (D(None, Candidate)): best candidate or None if none can connect 1053 @return (D(None, Candidate)): best candidate or None if none can connect
1052 """ 1054 """
1053 defer_candidates = None 1055 defer_candidates = None
1054 1056
1055 def connectionCb(client, candidate): 1057 def connectionCb(client, candidate):
1056 log.info(u"Connection of {} successful".format(unicode(candidate))) 1058 log.info("Connection of {} successful".format(str(candidate)))
1057 for idx, other_candidate in enumerate(candidates): 1059 for idx, other_candidate in enumerate(candidates):
1058 try: 1060 try:
1059 if other_candidate.priority < candidate.priority: 1061 if other_candidate.priority < candidate.priority:
1060 log.debug(u"Cancelling {}".format(other_candidate)) 1062 log.debug("Cancelling {}".format(other_candidate))
1061 defer_candidates[idx].cancel() 1063 defer_candidates[idx].cancel()
1062 except AttributeError: 1064 except AttributeError:
1063 assert other_candidate is None 1065 assert other_candidate is None
1064 1066
1065 def connectionEb(failure, client, candidate): 1067 def connectionEb(failure, client, candidate):
1066 if failure.check(defer.CancelledError): 1068 if failure.check(defer.CancelledError):
1067 log.debug(u"Connection of {} has been cancelled".format(candidate)) 1069 log.debug("Connection of {} has been cancelled".format(candidate))
1068 else: 1070 else:
1069 log.info( 1071 log.info(
1070 u"Connection of {candidate} Failed: {error}".format( 1072 "Connection of {candidate} Failed: {error}".format(
1071 candidate=candidate, error=failure.value 1073 candidate=candidate, error=failure.value
1072 ) 1074 )
1073 ) 1075 )
1074 candidates[candidates.index(candidate)] = None 1076 candidates[candidates.index(candidate)] = None
1075 1077
1076 def allTested(self): 1078 def allTested(self):
1077 log.debug(u"All candidates have been tested") 1079 log.debug("All candidates have been tested")
1078 good_candidates = [c for c in candidates if c] 1080 good_candidates = [c for c in candidates if c]
1079 return good_candidates[0] if good_candidates else None 1081 return good_candidates[0] if good_candidates else None
1080 1082
1081 defer_candidates = self.tryCandidates( 1083 defer_candidates = self.tryCandidates(
1082 client, 1084 client,
1094 """Called when stream was not started quickly enough 1096 """Called when stream was not started quickly enough
1095 1097
1096 @param session_hash(str): hash as returned by getSessionHash 1098 @param session_hash(str): hash as returned by getSessionHash
1097 @param client: %(doc_client)s 1099 @param client: %(doc_client)s
1098 """ 1100 """
1099 log.info(u"Socks5 Bytestream: TimeOut reached") 1101 log.info("Socks5 Bytestream: TimeOut reached")
1100 session = self.getSession(client, session_hash) 1102 session = self.getSession(client, session_hash)
1101 session[DEFER_KEY].errback(exceptions.TimeOutError) 1103 session[DEFER_KEY].errback(exceptions.TimeOutError)
1102 1104
1103 def killSession(self, failure_, session_hash, sid, client): 1105 def killSession(self, failure_, session_hash, sid, client):
1104 """Clean the current session 1106 """Clean the current session
1109 @param client: %(doc_client)s 1111 @param client: %(doc_client)s
1110 @param failure_(None, failure.Failure): None if eveything was fine, a failure else 1112 @param failure_(None, failure.Failure): None if eveything was fine, a failure else
1111 @return (None, failure.Failure): failure_ is returned 1113 @return (None, failure.Failure): failure_ is returned
1112 """ 1114 """
1113 log.debug( 1115 log.debug(
1114 u"Cleaning session with hash {hash}{id}: {reason}".format( 1116 "Cleaning session with hash {hash}{id}: {reason}".format(
1115 hash=session_hash, 1117 hash=session_hash,
1116 reason="" if failure_ is None else failure_.value, 1118 reason="" if failure_ is None else failure_.value,
1117 id="" if sid is None else u" (id: {})".format(sid), 1119 id="" if sid is None else " (id: {})".format(sid),
1118 ) 1120 )
1119 ) 1121 )
1120 1122
1121 try: 1123 try:
1122 assert self.hash_clients_map[session_hash] == client 1124 assert self.hash_clients_map[session_hash] == client
1126 1128
1127 if sid is not None: 1129 if sid is not None:
1128 try: 1130 try:
1129 del client.xep_0065_sid_session[sid] 1131 del client.xep_0065_sid_session[sid]
1130 except KeyError: 1132 except KeyError:
1131 log.warning(u"Session id {} is unknown".format(sid)) 1133 log.warning("Session id {} is unknown".format(sid))
1132 1134
1133 try: 1135 try:
1134 session_data = client._s5b_sessions[session_hash] 1136 session_data = client._s5b_sessions[session_hash]
1135 except KeyError: 1137 except KeyError:
1136 log.warning(u"There is no session with this hash") 1138 log.warning("There is no session with this hash")
1137 return 1139 return
1138 else: 1140 else:
1139 del client._s5b_sessions[session_hash] 1141 del client._s5b_sessions[session_hash]
1140 1142
1141 try: 1143 try:
1173 for candidate in candidates: 1175 for candidate in candidates:
1174 streamhost = query_elt.addElement("streamhost") 1176 streamhost = query_elt.addElement("streamhost")
1175 streamhost["host"] = candidate.host 1177 streamhost["host"] = candidate.host
1176 streamhost["port"] = str(candidate.port) 1178 streamhost["port"] = str(candidate.port)
1177 streamhost["jid"] = candidate.jid.full() 1179 streamhost["jid"] = candidate.jid.full()
1178 log.debug(u"Candidate proposed: {}".format(candidate)) 1180 log.debug("Candidate proposed: {}".format(candidate))
1179 1181
1180 d = iq_elt.send() 1182 d = iq_elt.send()
1181 args = [client, session_data, local_jid] 1183 args = [client, session_data, local_jid]
1182 d.addCallbacks(self._IQNegotiationCb, self._IQNegotiationEb, args, None, args) 1184 d.addCallbacks(self._IQNegotiationCb, self._IQNegotiationEb, args, None, args)
1183 1185
1190 @param session_data(dict): data of the session 1192 @param session_data(dict): data of the session
1191 @param client: %(doc_client)s 1193 @param client: %(doc_client)s
1192 @param iq_elt(domish.Element): <iq> result 1194 @param iq_elt(domish.Element): <iq> result
1193 """ 1195 """
1194 try: 1196 try:
1195 query_elt = iq_elt.elements(NS_BS, "query").next() 1197 query_elt = next(iq_elt.elements(NS_BS, "query"))
1196 streamhost_used_elt = query_elt.elements(NS_BS, "streamhost-used").next() 1198 streamhost_used_elt = next(query_elt.elements(NS_BS, "streamhost-used"))
1197 except StopIteration: 1199 except StopIteration:
1198 log.warning(u"No streamhost found in stream query") 1200 log.warning("No streamhost found in stream query")
1199 # FIXME: must clean session 1201 # FIXME: must clean session
1200 return 1202 return
1201 1203
1202 streamhost_jid = jid.JID(streamhost_used_elt["jid"]) 1204 streamhost_jid = jid.JID(streamhost_used_elt["jid"])
1203 try: 1205 try:
1204 candidate = ( 1206 candidate = next((
1205 c for c in session_data["candidates"] if c.jid == streamhost_jid 1207 c for c in session_data["candidates"] if c.jid == streamhost_jid
1206 ).next() 1208 ))
1207 except StopIteration: 1209 except StopIteration:
1208 log.warning( 1210 log.warning(
1209 u"Candidate [{jid}] is unknown !".format(jid=streamhost_jid.full()) 1211 "Candidate [{jid}] is unknown !".format(jid=streamhost_jid.full())
1210 ) 1212 )
1211 return 1213 return
1212 else: 1214 else:
1213 log.info(u"Candidate choosed by target: {}".format(candidate)) 1215 log.info("Candidate choosed by target: {}".format(candidate))
1214 1216
1215 if candidate.type == XEP_0065.TYPE_PROXY: 1217 if candidate.type == XEP_0065.TYPE_PROXY:
1216 log.info(u"A Socks5 proxy is used") 1218 log.info("A Socks5 proxy is used")
1217 d = self.connectCandidate(client, candidate, session_data["hash"]) 1219 d = self.connectCandidate(client, candidate, session_data["hash"])
1218 d.addCallback( 1220 d.addCallback(
1219 lambda __: candidate.activate( 1221 lambda __: candidate.activate(
1220 client, session_data["id"], session_data["peer_jid"], local_jid 1222 client, session_data["id"], session_data["peer_jid"], local_jid
1221 ) 1223 )
1225 d = defer.succeed(None) 1227 d = defer.succeed(None)
1226 1228
1227 d.addCallback(lambda __: candidate.startTransfer(session_data["hash"])) 1229 d.addCallback(lambda __: candidate.startTransfer(session_data["hash"]))
1228 1230
1229 def _activationEb(self, failure): 1231 def _activationEb(self, failure):
1230 log.warning(u"Proxy activation error: {}".format(failure.value)) 1232 log.warning("Proxy activation error: {}".format(failure.value))
1231 1233
1232 def _IQNegotiationEb(self, stanza_err, client, session_data, local_jid): 1234 def _IQNegotiationEb(self, stanza_err, client, session_data, local_jid):
1233 log.warning(u"Socks5 transfer failed: {}".format(stanza_err.value)) 1235 log.warning("Socks5 transfer failed: {}".format(stanza_err.value))
1234 # FIXME: must clean session 1236 # FIXME: must clean session
1235 1237
1236 def createSession(self, *args, **kwargs): 1238 def createSession(self, *args, **kwargs):
1237 """like [_createSession] but return the session deferred instead of the whole session 1239 """like [_createSession] but return the session deferred instead of the whole session
1238 1240
1250 @param sid(unicode): session id 1252 @param sid(unicode): session id
1251 @param initiator(bool): if True, this session is create by initiator 1253 @param initiator(bool): if True, this session is create by initiator
1252 @return (dict): session data 1254 @return (dict): session data
1253 """ 1255 """
1254 if sid in client.xep_0065_sid_session: 1256 if sid in client.xep_0065_sid_session:
1255 raise exceptions.ConflictError(u"A session with this id already exists !") 1257 raise exceptions.ConflictError("A session with this id already exists !")
1256 if requester: 1258 if requester:
1257 session_hash = getSessionHash(local_jid, to_jid, sid) 1259 session_hash = getSessionHash(local_jid, to_jid, sid)
1258 session_data = self._registerHash(client, session_hash, stream_object) 1260 session_data = self._registerHash(client, session_hash, stream_object)
1259 else: 1261 else:
1260 session_hash = getSessionHash(to_jid, local_jid, sid) 1262 session_hash = getSessionHash(to_jid, local_jid, sid)
1289 for incoming request received by Socks5ServerFactory). None must 1291 for incoming request received by Socks5ServerFactory). None must
1290 only be used by Socks5ServerFactory. 1292 only be used by Socks5ServerFactory.
1291 See comments below for details 1293 See comments below for details
1292 @return (dict): session data 1294 @return (dict): session data
1293 """ 1295 """
1296 assert isinstance(session_hash, str)
1294 if client is None: 1297 if client is None:
1295 try: 1298 try:
1296 client = self.hash_clients_map[session_hash] 1299 client = self.hash_clients_map[session_hash]
1297 except KeyError as e: 1300 except KeyError as e:
1298 log.warning(u"The requested session doesn't exists !") 1301 log.warning("The requested session doesn't exists !")
1299 raise e 1302 raise e
1300 return client._s5b_sessions[session_hash] 1303 return client._s5b_sessions[session_hash]
1301 1304
1302 def registerHash(self, *args, **kwargs): 1305 def registerHash(self, *args, **kwargs):
1303 """like [_registerHash] but return the session deferred instead of the whole session 1306 """like [_registerHash] but return the session deferred instead of the whole session
1334 session_data = self.getSession(client, session_hash) 1337 session_data = self.getSession(client, session_hash)
1335 assert "stream_object" not in session_data 1338 assert "stream_object" not in session_data
1336 session_data["stream_object"] = stream_object 1339 session_data["stream_object"] = stream_object
1337 1340
1338 def streamQuery(self, iq_elt, client): 1341 def streamQuery(self, iq_elt, client):
1339 log.debug(u"BS stream query") 1342 log.debug("BS stream query")
1340 1343
1341 iq_elt.handled = True 1344 iq_elt.handled = True
1342 1345
1343 query_elt = iq_elt.elements(NS_BS, "query").next() 1346 query_elt = next(iq_elt.elements(NS_BS, "query"))
1344 try: 1347 try:
1345 sid = query_elt["sid"] 1348 sid = query_elt["sid"]
1346 except KeyError: 1349 except KeyError:
1347 log.warning(u"Invalid bystreams request received") 1350 log.warning("Invalid bystreams request received")
1348 return client.sendError(iq_elt, "bad-request") 1351 return client.sendError(iq_elt, "bad-request")
1349 1352
1350 streamhost_elts = list(query_elt.elements(NS_BS, "streamhost")) 1353 streamhost_elts = list(query_elt.elements(NS_BS, "streamhost"))
1351 if not streamhost_elts: 1354 if not streamhost_elts:
1352 return client.sendError(iq_elt, "bad-request") 1355 return client.sendError(iq_elt, "bad-request")
1353 1356
1354 try: 1357 try:
1355 session_data = client.xep_0065_sid_session[sid] 1358 session_data = client.xep_0065_sid_session[sid]
1356 except KeyError: 1359 except KeyError:
1357 log.warning(u"Ignoring unexpected BS transfer: {}".format(sid)) 1360 log.warning("Ignoring unexpected BS transfer: {}".format(sid))
1358 return client.sendError(iq_elt, "not-acceptable") 1361 return client.sendError(iq_elt, "not-acceptable")
1359 1362
1360 peer_jid = session_data["peer_jid"] = jid.JID(iq_elt["from"]) 1363 peer_jid = session_data["peer_jid"] = jid.JID(iq_elt["from"])
1361 1364
1362 candidates = [] 1365 candidates = []
1363 nb_sh = len(streamhost_elts) 1366 nb_sh = len(streamhost_elts)
1364 for idx, sh_elt in enumerate(streamhost_elts): 1367 for idx, sh_elt in enumerate(streamhost_elts):
1365 try: 1368 try:
1366 host, port, jid_ = sh_elt["host"], sh_elt["port"], jid.JID(sh_elt["jid"]) 1369 host, port, jid_ = sh_elt["host"], sh_elt["port"], jid.JID(sh_elt["jid"])
1367 except KeyError: 1370 except KeyError:
1368 log.warning(u"malformed streamhost element") 1371 log.warning("malformed streamhost element")
1369 return client.sendError(iq_elt, "bad-request") 1372 return client.sendError(iq_elt, "bad-request")
1370 priority = nb_sh - idx 1373 priority = nb_sh - idx
1371 if jid_.userhostJID() != peer_jid.userhostJID(): 1374 if jid_.userhostJID() != peer_jid.userhostJID():
1372 type_ = XEP_0065.TYPE_PROXY 1375 type_ = XEP_0065.TYPE_PROXY
1373 else: 1376 else:
1374 type_ = XEP_0065.TYPE_DIRECT 1377 type_ = XEP_0065.TYPE_DIRECT
1375 candidates.append(Candidate(host, port, type_, priority, jid_)) 1378 candidates.append(Candidate(host, port, type_, priority, jid_))
1376 1379
1377 for candidate in candidates: 1380 for candidate in candidates:
1378 log.info(u"Candidate proposed: {}".format(candidate)) 1381 log.info("Candidate proposed: {}".format(candidate))
1379 1382
1380 d = self.getBestCandidate(client, candidates, session_data["hash"]) 1383 d = self.getBestCandidate(client, candidates, session_data["hash"])
1381 d.addCallback(self._ackStream, iq_elt, session_data, client) 1384 d.addCallback(self._ackStream, iq_elt, session_data, client)
1382 1385
1383 def _ackStream(self, candidate, iq_elt, session_data, client): 1386 def _ackStream(self, candidate, iq_elt, session_data, client):
1384 if candidate is None: 1387 if candidate is None:
1385 log.info("No streamhost candidate worked, we have to end negotiation") 1388 log.info("No streamhost candidate worked, we have to end negotiation")
1386 return client.sendError(iq_elt, "item-not-found") 1389 return client.sendError(iq_elt, "item-not-found")
1387 log.info(u"We choose: {}".format(candidate)) 1390 log.info("We choose: {}".format(candidate))
1388 result_elt = xmlstream.toResponse(iq_elt, "result") 1391 result_elt = xmlstream.toResponse(iq_elt, "result")
1389 query_elt = result_elt.addElement((NS_BS, "query")) 1392 query_elt = result_elt.addElement((NS_BS, "query"))
1390 query_elt["sid"] = session_data["id"] 1393 query_elt["sid"] = session_data["id"]
1391 streamhost_used_elt = query_elt.addElement("streamhost-used") 1394 streamhost_used_elt = query_elt.addElement("streamhost-used")
1392 streamhost_used_elt["jid"] = candidate.jid.full() 1395 streamhost_used_elt["jid"] = candidate.jid.full()
1393 client.send(result_elt) 1396 client.send(result_elt)
1394 1397
1395 1398
1399 @implementer(iwokkel.IDisco)
1396 class XEP_0065_handler(XMPPHandler): 1400 class XEP_0065_handler(XMPPHandler):
1397 implements(iwokkel.IDisco)
1398 1401
1399 def __init__(self, plugin_parent): 1402 def __init__(self, plugin_parent):
1400 self.plugin_parent = plugin_parent 1403 self.plugin_parent = plugin_parent
1401 self.host = plugin_parent.host 1404 self.host = plugin_parent.host
1402 1405