diff sat/plugins/plugin_xep_0065.py @ 2624:56f94936df1e

code style reformatting using black
author Goffi <goffi@goffi.org>
date Wed, 27 Jun 2018 20:14:46 +0200
parents 26edcf3a30eb
children 378188abe941
line wrap: on
line diff
--- a/sat/plugins/plugin_xep_0065.py	Wed Jun 27 07:51:29 2018 +0200
+++ b/sat/plugins/plugin_xep_0065.py	Wed Jun 27 20:14:46 2018 +0200
@@ -1,5 +1,5 @@
 #!/usr/bin/env python2
-#-*- coding: utf-8 -*-
+# -*- coding: utf-8 -*-
 
 # SAT plugin for managing xep-0065
 
@@ -56,6 +56,7 @@
 
 from sat.core.i18n import _
 from sat.core.log import getLogger
+
 log = getLogger(__name__)
 from sat.core.constants import Const as C
 from sat.core import exceptions
@@ -92,25 +93,27 @@
     C.PI_RECOMMENDATIONS: ["NAT-PORT"],
     C.PI_MAIN: "XEP_0065",
     C.PI_HANDLER: "yes",
-    C.PI_DESCRIPTION: _("""Implementation of SOCKS5 Bytestreams""")
+    C.PI_DESCRIPTION: _("""Implementation of SOCKS5 Bytestreams"""),
 }
 
 IQ_SET = '/iq[@type="set"]'
-NS_BS = 'http://jabber.org/protocol/bytestreams'
+NS_BS = "http://jabber.org/protocol/bytestreams"
 BS_REQUEST = IQ_SET + '/query[@xmlns="' + NS_BS + '"]'
-TIMER_KEY = 'timer'
-DEFER_KEY = 'finished' # key of the deferred used to track session end
-SERVER_STARTING_PORT = 0 # starting number for server port search (0 to ask automatic attribution)
+TIMER_KEY = "timer"
+DEFER_KEY = "finished"  # key of the deferred used to track session end
+SERVER_STARTING_PORT = (
+    0
+)  # starting number for server port search (0 to ask automatic attribution)
 
 # priorities are candidates local priorities, must be a int between 0 and 65535
 PRIORITY_BEST_DIRECT = 10000
 PRIORITY_DIRECT = 5000
 PRIORITY_ASSISTED = 1000
-PRIORITY_PROXY = 0.2 # proxy is the last option for s5b
-CANDIDATE_DELAY = 0.2 # see XEP-0260 §4
-CANDIDATE_DELAY_PROXY = 0.2 # additional time for proxy types (see XEP-0260 §4 note 3)
+PRIORITY_PROXY = 0.2  # proxy is the last option for s5b
+CANDIDATE_DELAY = 0.2  # see XEP-0260 §4
+CANDIDATE_DELAY_PROXY = 0.2  # additional time for proxy types (see XEP-0260 §4 note 3)
 
-TIMEOUT = 300 # maxium time between session creation and stream start
+TIMEOUT = 300  # maxium time between session creation and stream start
 
 # XXX: by default eveything is automatic
 # TODO: use these params to force use of specific proxy/port/IP
@@ -132,14 +135,15 @@
 #     </params>
 #     """
 
-(STATE_INITIAL,
-STATE_AUTH,
-STATE_REQUEST,
-STATE_READY,
-STATE_AUTH_USERPASS,
-STATE_CLIENT_INITIAL,
-STATE_CLIENT_AUTH,
-STATE_CLIENT_REQUEST,
+(
+    STATE_INITIAL,
+    STATE_AUTH,
+    STATE_REQUEST,
+    STATE_READY,
+    STATE_AUTH_USERPASS,
+    STATE_CLIENT_INITIAL,
+    STATE_CLIENT_AUTH,
+    STATE_CLIENT_REQUEST,
 ) = xrange(8)
 
 SOCKS5_VER = 0x05
@@ -167,12 +171,21 @@
 REPLY_ADDR_NOT_SUPPORTED = 0x08
 
 
-ProxyInfos = namedtuple("ProxyInfos", ['host', 'jid', 'port'])
+ProxyInfos = namedtuple("ProxyInfos", ["host", "jid", "port"])
 
 
 class Candidate(object):
-
-    def __init__(self, host, port, type_, priority, jid_, id_=None, priority_local=False, factory=None):
+    def __init__(
+        self,
+        host,
+        port,
+        type_,
+        priority,
+        jid_,
+        id_=None,
+        priority_local=False,
+        factory=None,
+    ):
         """
         @param host(unicode): host IP or domain
         @param port(int): port
@@ -184,8 +197,7 @@
             else priority is used as global one (and local priority is set to 0)
         """
         assert isinstance(jid_, jid.JID)
-        self.host, self.port, self.type, self.jid = (
-            host, int(port), type_, jid_)
+        self.host, self.port, self.type, self.jid = (host, int(port), type_, jid_)
         self.id = id_ if id_ is not None else unicode(uuid.uuid4())
         if priority_local:
             self._local_priority = int(priority)
@@ -204,7 +216,7 @@
         try:
             self.factory.discard()
         except AttributeError:
-            pass # no discard for Socks5ServerFactory
+            pass  # no discard for Socks5ServerFactory
 
     @property
     def local_priority(self):
@@ -218,22 +230,25 @@
         # similar to __unicode__ but we don't show jid and we encode id
         return "Candidate ({0.priority}): host={0.host} port={0.port} type={0.type}{id}".format(
             self,
-            id=u" id={}".format(self.id if self.id is not None else u'').encode('utf-8', 'ignore'),
-            )
+            id=u" id={}".format(self.id if self.id is not None else u"").encode(
+                "utf-8", "ignore"
+            ),
+        )
 
     def __unicode__(self):
         return u"Candidate ({0.priority}): host={0.host} port={0.port} jid={0.jid} type={0.type}{id}".format(
-            self,
-            id=u" id={}".format(self.id if self.id is not None else u''),
-            )
+            self, id=u" id={}".format(self.id if self.id is not None else u"")
+        )
 
     def __eq__(self, other):
         # self.id is is not used in __eq__ as the same candidate can have
         # different ids if proposed by initiator or responder
         try:
-            return (self.host == other.host and
-                    self.port == other.port and
-                    self.jid == other.jid)
+            return (
+                self.host == other.host
+                and self.port == other.port
+                and self.jid == other.jid
+            )
         except (AttributeError, TypeError):
             return False
 
@@ -256,7 +271,7 @@
             multiplier = 10
         else:
             raise exceptions.InternalError(u"Unknown {} type !".format(self.type))
-        return 2**16 * multiplier + self._local_priority
+        return 2 ** 16 * multiplier + self._local_priority
 
     def activate(self, sid, peer_jid, client):
         """Activate the proxy candidate
@@ -269,15 +284,15 @@
         """
         assert self.type == XEP_0065.TYPE_PROXY
         iq_elt = client.IQ()
-        iq_elt['to'] = self.jid.full()
-        query_elt = iq_elt.addElement((NS_BS, 'query'))
-        query_elt['sid'] = sid
-        query_elt.addElement('activate', content=peer_jid.full())
+        iq_elt["to"] = self.jid.full()
+        query_elt = iq_elt.addElement((NS_BS, "query"))
+        query_elt["sid"] = sid
+        query_elt.addElement("activate", content=peer_jid.full())
         return iq_elt.send()
 
     def startTransfer(self, session_hash=None):
         if self.type == XEP_0065.TYPE_PROXY:
-            chunk_size = 4096 # Prosody's proxy reject bigger chunks by default
+            chunk_size = 4096  # Prosody's proxy reject bigger chunks by default
         else:
             chunk_size = None
         self.factory.startTransfer(session_hash, chunk_size=chunk_size)
@@ -291,18 +306,20 @@
     @param sid(unicode): session id
     @return (str): hash
     """
-    return hashlib.sha1((sid + requester_jid.full() + target_jid.full()).encode('utf-8')).hexdigest()
+    return hashlib.sha1(
+        (sid + requester_jid.full() + target_jid.full()).encode("utf-8")
+    ).hexdigest()
 
 
 class SOCKSv5(protocol.Protocol):
-    CHUNK_SIZE = 2**16
+    CHUNK_SIZE = 2 ** 16
 
     def __init__(self, session_hash=None):
         """
         @param session_hash(str): hash of the session
             must only be used in client mode
         """
-        self.connection = defer.Deferred() # called when connection/auth is done
+        self.connection = defer.Deferred()  # called when connection/auth is done
         if session_hash is not None:
             self.server_mode = False
             self._session_hash = session_hash
@@ -318,13 +335,13 @@
         self.addressType = 0
         self.requestType = 0
         self._stream_object = None
-        self.active = False # set to True when protocol is actually used for transfer
-                            # used by factories to know when the finished Deferred can be triggered
+        self.active = False  # set to True when protocol is actually used for transfer
+        # used by factories to know when the finished Deferred can be triggered
 
     @property
     def stream_object(self):
         if self._stream_object is None:
-            self._stream_object = self.getSession()['stream_object']
+            self._stream_object = self.getSession()["stream_object"]
             if self.server_mode:
                 self._stream_object.registerProducer(self.transport, True)
         return self._stream_object
@@ -342,22 +359,22 @@
     def _startNegotiation(self):
         log.debug("starting negotiation (client mode)")
         self.state = STATE_CLIENT_AUTH
-        self.transport.write(struct.pack('!3B', SOCKS5_VER, 1, AUTHMECH_ANON))
+        self.transport.write(struct.pack("!3B", SOCKS5_VER, 1, AUTHMECH_ANON))
 
     def _parseNegotiation(self):
         try:
             # Parse out data
-            ver, nmethod = struct.unpack('!BB', self.buf[:2])
-            methods = struct.unpack('%dB' % nmethod, self.buf[2:nmethod + 2])
+            ver, nmethod = struct.unpack("!BB", self.buf[:2])
+            methods = struct.unpack("%dB" % nmethod, self.buf[2 : nmethod + 2])
 
             # Ensure version is correct
             if ver != 5:
-                self.transport.write(struct.pack('!BB', SOCKS5_VER, AUTHMECH_INVALID))
+                self.transport.write(struct.pack("!BB", SOCKS5_VER, AUTHMECH_INVALID))
                 self.transport.loseConnection()
                 return
 
             # Trim off front of the buffer
-            self.buf = self.buf[nmethod + 2:]
+            self.buf = self.buf[nmethod + 2 :]
 
             # Check for supported auth mechs
             for m in self.supportedAuthMechs:
@@ -368,12 +385,12 @@
                     elif m == AUTHMECH_USERPASS:
                         self.state = STATE_AUTH_USERPASS
                     # Complete negotiation w/ this method
-                    self.transport.write(struct.pack('!BB', SOCKS5_VER, m))
+                    self.transport.write(struct.pack("!BB", SOCKS5_VER, m))
                     return
 
             # No supported mechs found, notify client and close the connection
             log.warning(u"Unsupported authentication mechanism")
-            self.transport.write(struct.pack('!BB', SOCKS5_VER, AUTHMECH_INVALID))
+            self.transport.write(struct.pack("!BB", SOCKS5_VER, AUTHMECH_INVALID))
             self.transport.loseConnection()
         except struct.error:
             pass
@@ -381,34 +398,34 @@
     def _parseUserPass(self):
         try:
             # Parse out data
-            ver, ulen = struct.unpack('BB', self.buf[:2])
-            uname, = struct.unpack('%ds' % ulen, self.buf[2:ulen + 2])
-            plen, = struct.unpack('B', self.buf[ulen + 2])
-            password, = struct.unpack('%ds' % plen, self.buf[ulen + 3:ulen + 3 + plen])
+            ver, ulen = struct.unpack("BB", self.buf[:2])
+            uname, = struct.unpack("%ds" % ulen, self.buf[2 : ulen + 2])
+            plen, = struct.unpack("B", self.buf[ulen + 2])
+            password, = struct.unpack("%ds" % plen, self.buf[ulen + 3 : ulen + 3 + plen])
             # Trim off fron of the buffer
-            self.buf = self.buf[3 + ulen + plen:]
+            self.buf = self.buf[3 + ulen + plen :]
             # Fire event to authenticate user
             if self.authenticateUserPass(uname, password):
                 # Signal success
                 self.state = STATE_REQUEST
-                self.transport.write(struct.pack('!BB', SOCKS5_VER, 0x00))
+                self.transport.write(struct.pack("!BB", SOCKS5_VER, 0x00))
             else:
                 # Signal failure
-                self.transport.write(struct.pack('!BB', SOCKS5_VER, 0x01))
+                self.transport.write(struct.pack("!BB", SOCKS5_VER, 0x01))
                 self.transport.loseConnection()
         except struct.error:
             pass
 
     def sendErrorReply(self, errorcode):
         # Any other address types are not supported
-        result = struct.pack('!BBBBIH', SOCKS5_VER, errorcode, 0, 1, 0, 0)
+        result = struct.pack("!BBBBIH", SOCKS5_VER, errorcode, 0, 1, 0, 0)
         self.transport.write(result)
         self.transport.loseConnection()
 
     def _parseRequest(self):
         try:
             # Parse out data and trim buffer accordingly
-            ver, cmd, rsvd, self.addressType = struct.unpack('!BBBB', self.buf[:4])
+            ver, cmd, rsvd, self.addressType = struct.unpack("!BBBB", self.buf[:4])
 
             # Ensure we actually support the requested address type
             if self.addressType not in self.supportedAddrs:
@@ -417,12 +434,12 @@
 
             # Deal with addresses
             if self.addressType == ADDR_IPV4:
-                addr, port = struct.unpack('!IH', self.buf[4:10])
+                addr, port = struct.unpack("!IH", self.buf[4:10])
                 self.buf = self.buf[10:]
             elif self.addressType == ADDR_DOMAINNAME:
                 nlen = ord(self.buf[4])
-                addr, port = struct.unpack('!%dsH' % nlen, self.buf[5:])
-                self.buf = self.buf[7 + len(addr):]
+                addr, port = struct.unpack("!%dsH" % nlen, self.buf[5:])
+                self.buf = self.buf[7 + len(addr) :]
             else:
                 # Any other address types are not supported
                 self.sendErrorReply(REPLY_ADDR_NOT_SUPPORTED)
@@ -449,13 +466,22 @@
 
     def _makeRequest(self):
         hash_ = self._session_hash
-        request = struct.pack('!5B%dsH' % len(hash_), SOCKS5_VER, CMD_CONNECT, 0, ADDR_DOMAINNAME, len(hash_), hash_, 0)
+        request = struct.pack(
+            "!5B%dsH" % len(hash_),
+            SOCKS5_VER,
+            CMD_CONNECT,
+            0,
+            ADDR_DOMAINNAME,
+            len(hash_),
+            hash_,
+            0,
+        )
         self.transport.write(request)
         self.state = STATE_CLIENT_REQUEST
 
     def _parseRequestReply(self):
         try:
-            ver, rep, rsvd, self.addressType = struct.unpack('!BBBB', self.buf[:4])
+            ver, rep, rsvd, self.addressType = struct.unpack("!BBBB", self.buf[:4])
             # Ensure we actually support the requested address type
             if self.addressType not in self.supportedAddrs:
                 self.sendErrorReply(REPLY_ADDR_NOT_SUPPORTED)
@@ -463,12 +489,12 @@
 
             # Deal with addresses
             if self.addressType == ADDR_IPV4:
-                addr, port = struct.unpack('!IH', self.buf[4:10])
+                addr, port = struct.unpack("!IH", self.buf[4:10])
                 self.buf = self.buf[10:]
             elif self.addressType == ADDR_DOMAINNAME:
                 nlen = ord(self.buf[4])
-                addr, port = struct.unpack('!%dsH' % nlen, self.buf[5:])
-                self.buf = self.buf[7 + len(addr):]
+                addr, port = struct.unpack("!%dsH" % nlen, self.buf[5:])
+                self.buf = self.buf[7 + len(addr) :]
             else:
                 # Any other address types are not supported
                 self.sendErrorReply(REPLY_ADDR_NOT_SUPPORTED)
@@ -487,7 +513,11 @@
             return None
 
     def connectionMade(self):
-        log.debug(u"Socks5 connectionMade (mode = {})".format("server" if self.state == STATE_INITIAL else "client"))
+        log.debug(
+            u"Socks5 connectionMade (mode = {})".format(
+                "server" if self.state == STATE_INITIAL else "client"
+            )
+        )
         if self.state == STATE_CLIENT_INITIAL:
             self._startNegotiation()
 
@@ -495,8 +525,11 @@
         # Check that this session is expected
         if not self.factory.addToSession(addr, self):
             self.sendErrorReply(REPLY_CONN_REFUSED)
-            log.warning(u"Unexpected connection request received from {host}"
-                .format(host=self.transport.getPeer().host))
+            log.warning(
+                u"Unexpected connection request received from {host}".format(
+                    host=self.transport.getPeer().host
+                )
+            )
             return
         self._session_hash = addr
         self.connectCompleted(addr, 0)
@@ -519,10 +552,20 @@
 
     def connectCompleted(self, remotehost, remoteport):
         if self.addressType == ADDR_IPV4:
-            result = struct.pack('!BBBBIH', SOCKS5_VER, REPLY_SUCCESS, 0, 1, remotehost, remoteport)
+            result = struct.pack(
+                "!BBBBIH", SOCKS5_VER, REPLY_SUCCESS, 0, 1, remotehost, remoteport
+            )
         elif self.addressType == ADDR_DOMAINNAME:
-            result = struct.pack('!BBBBB%dsH' % len(remotehost), SOCKS5_VER, REPLY_SUCCESS, 0,
-                                 ADDR_DOMAINNAME, len(remotehost), remotehost, remoteport)
+            result = struct.pack(
+                "!BBBBB%dsH" % len(remotehost),
+                SOCKS5_VER,
+                REPLY_SUCCESS,
+                0,
+                ADDR_DOMAINNAME,
+                len(remotehost),
+                remotehost,
+                remoteport,
+            )
         self.transport.write(result)
         self.state = STATE_READY
 
@@ -553,7 +596,7 @@
         if self.state == STATE_CLIENT_REQUEST:
             self._parseRequestReply()
         if self.state == STATE_CLIENT_AUTH:
-            ver, method = struct.unpack('!BB', buf)
+            ver, method = struct.unpack("!BB", buf)
             self.buf = self.buf[2:]
             if ver != SOCKS5_VER or method != AUTHMECH_ANON:
                 self.transport.loseConnection()
@@ -564,7 +607,7 @@
         log.debug(u"Socks5 connection lost: {}".format(reason.value))
         if self.state != STATE_READY:
             self.connection.errback(reason)
-        if self.server_mode :
+        if self.server_mode:
             self.factory.removeFromSession(self._session_hash, self, reason)
 
 
@@ -583,7 +626,7 @@
     def startTransfer(self, session_hash, chunk_size=None):
         session = self.getSession(session_hash)
         try:
-            protocol = session['protocols'][0]
+            protocol = session["protocols"][0]
         except (KeyError, IndexError):
             log.error(u"Can't start file transfer, can't find protocol")
         else:
@@ -603,7 +646,7 @@
         except KeyError:
             return False
         else:
-            session_data.setdefault('protocols', []).append(protocol)
+            session_data.setdefault("protocols", []).append(protocol)
             return True
 
     def removeFromSession(self, session_hash, protocol, reason):
@@ -616,7 +659,7 @@
         @param reason(failure.Failure): reason of the removal
         """
         try:
-            protocols = self.getSession(session_hash)['protocols']
+            protocols = self.getSession(session_hash)["protocols"]
             protocols.remove(protocol)
         except (KeyError, ValueError):
             log.error(u"Protocol not found in session while it should be there")
@@ -686,10 +729,10 @@
 
 class XEP_0065(object):
     NAMESPACE = NS_BS
-    TYPE_DIRECT = 'direct'
-    TYPE_ASSISTED = 'assisted'
-    TYPE_TUNEL = 'tunel'
-    TYPE_PROXY = 'proxy'
+    TYPE_DIRECT = "direct"
+    TYPE_ASSISTED = "assisted"
+    TYPE_TUNEL = "tunel"
+    TYPE_PROXY = "proxy"
     Candidate = Candidate
 
     def __init__(self, host):
@@ -698,16 +741,16 @@
 
         # session data
         self.hash_clients_map = {}  # key: hash of the transfer session, value: session data
-        self._cache_proxies = {} # key: server jid, value: proxy data
+        self._cache_proxies = {}  # key: server jid, value: proxy data
 
         # misc data
         self._server_factory = None
         self._external_port = None
 
         # plugins shortcuts
-        self._ip = self.host.plugins['IP']
+        self._ip = self.host.plugins["IP"]
         try:
-            self._np = self.host.plugins['NAT-PORT']
+            self._np = self.host.plugins["NAT-PORT"]
         except KeyError:
             log.debug(u"NAT Port plugin not available")
             self._np = None
@@ -739,16 +782,22 @@
                 try:
                     listening_port = reactor.listenTCP(port, self._server_factory)
                 except internet_error.CannotListenError as e:
-                    log.debug(u"Cannot listen on port {port}: {err_msg}{err_num}".format(
-                        port=port,
-                        err_msg=e.socketError.strerror,
-                        err_num=u' (error code: {})'.format(e.socketError.errno),
-                        ))
+                    log.debug(
+                        u"Cannot listen on port {port}: {err_msg}{err_num}".format(
+                            port=port,
+                            err_msg=e.socketError.strerror,
+                            err_num=u" (error code: {})".format(e.socketError.errno),
+                        )
+                    )
                 else:
                     self._server_factory_port = listening_port.getHost().port
                     break
 
-            log.info(_("Socks5 Stream server launched on port {}").format(self._server_factory_port))
+            log.info(
+                _("Socks5 Stream server launched on port {}").format(
+                    self._server_factory_port
+                )
+            )
         return self._server_factory
 
     @defer.inlineCallbacks
@@ -759,36 +808,43 @@
         @return ((D)(ProxyInfos, None)): Found proxy infos,
             or None if not acceptable proxy is found
         """
+
         def notFound(server):
             log.info(u"No proxy found on this server")
             self._cache_proxies[server] = None
             defer.returnValue(None)
+
         server = client.jid.host
         try:
             defer.returnValue(self._cache_proxies[server])
         except KeyError:
             pass
         try:
-            proxy = (yield self.host.findServiceEntities(client, 'proxy', 'bytestreams')).pop()
+            proxy = (
+                yield self.host.findServiceEntities(client, "proxy", "bytestreams")
+            ).pop()
         except (defer.CancelledError, StopIteration, KeyError):
             notFound(server)
-        iq_elt = client.IQ('get')
-        iq_elt['to'] = proxy.full()
-        iq_elt.addElement((NS_BS, 'query'))
+        iq_elt = client.IQ("get")
+        iq_elt["to"] = proxy.full()
+        iq_elt.addElement((NS_BS, "query"))
 
         try:
             result_elt = yield iq_elt.send()
         except jabber_error.StanzaError as failure:
-            log.warning(u"Error while requesting proxy info on {jid}: {error}"
-                .format(proxy.full(), failure))
+            log.warning(
+                u"Error while requesting proxy info on {jid}: {error}".format(
+                    proxy.full(), failure
+                )
+            )
             notFound(server)
 
         try:
-            query_elt = result_elt.elements(NS_BS, 'query').next()
-            streamhost_elt = query_elt.elements(NS_BS, 'streamhost').next()
-            host = streamhost_elt['host']
-            jid_ = streamhost_elt['jid']
-            port = streamhost_elt['port']
+            query_elt = result_elt.elements(NS_BS, "query").next()
+            streamhost_elt = query_elt.elements(NS_BS, "streamhost").next()
+            host = streamhost_elt["host"]
+            jid_ = streamhost_elt["jid"]
+            port = streamhost_elt["port"]
             if not all((host, jid, port)):
                 raise KeyError
             jid_ = jid.JID(jid_)
@@ -818,7 +874,9 @@
                 if self._np is None:
                     log.warning(u"NAT port plugin not available, we can't map port")
                 else:
-                    ext_port = yield self._np.mapPort(local_port, desc=u"SaT socks5 stream")
+                    ext_port = yield self._np.mapPort(
+                        local_port, desc=u"SaT socks5 stream"
+                    )
                     if ext_port is None:
                         log.warning(u"Can't map NAT port")
                     else:
@@ -843,17 +901,56 @@
 
         # the preferred direct connection
         ip = local_ips.pop(0)
-        candidates.append(Candidate(ip, local_port, XEP_0065.TYPE_DIRECT, PRIORITY_BEST_DIRECT, client.jid, priority_local=True, factory=server_factory))
+        candidates.append(
+            Candidate(
+                ip,
+                local_port,
+                XEP_0065.TYPE_DIRECT,
+                PRIORITY_BEST_DIRECT,
+                client.jid,
+                priority_local=True,
+                factory=server_factory,
+            )
+        )
         for ip in local_ips:
-            candidates.append(Candidate(ip, local_port, XEP_0065.TYPE_DIRECT, PRIORITY_DIRECT, client.jid, priority_local=True, factory=server_factory))
+            candidates.append(
+                Candidate(
+                    ip,
+                    local_port,
+                    XEP_0065.TYPE_DIRECT,
+                    PRIORITY_DIRECT,
+                    client.jid,
+                    priority_local=True,
+                    factory=server_factory,
+                )
+            )
 
         # then the assisted one
         if ext_port is not None:
-            candidates.append(Candidate(external_ip, ext_port, XEP_0065.TYPE_ASSISTED, PRIORITY_ASSISTED, client.jid, priority_local=True, factory=server_factory))
+            candidates.append(
+                Candidate(
+                    external_ip,
+                    ext_port,
+                    XEP_0065.TYPE_ASSISTED,
+                    PRIORITY_ASSISTED,
+                    client.jid,
+                    priority_local=True,
+                    factory=server_factory,
+                )
+            )
 
         # finally the proxy
         if proxy:
-            candidates.append(Candidate(proxy.host, proxy.port, XEP_0065.TYPE_PROXY, PRIORITY_PROXY, proxy.jid, priority_local=True))
+            candidates.append(
+                Candidate(
+                    proxy.host,
+                    proxy.port,
+                    XEP_0065.TYPE_PROXY,
+                    PRIORITY_PROXY,
+                    proxy.jid,
+                    priority_local=True,
+                )
+            )
 
         # should be already sorted, but just in case the priorities get weird
         candidates.sort(key=lambda c: c.priority, reverse=True)
@@ -870,7 +967,9 @@
         candidate.factory.connector = connector
         return candidate.factory.connection
 
-    def connectCandidate(self, client, candidate, session_hash, peer_session_hash=None, delay=None):
+    def connectCandidate(
+        self, client, candidate, session_hash, peer_session_hash=None, delay=None
+    ):
         """Connect to a candidate
 
         Connection will be done with a Socks5ClientFactory
@@ -900,16 +999,30 @@
         d.addCallback(self._addConnector, candidate)
         return d
 
-    def tryCandidates(self, client, candidates, session_hash, peer_session_hash, connection_cb=None, connection_eb=None):
+    def tryCandidates(
+        self,
+        client,
+        candidates,
+        session_hash,
+        peer_session_hash,
+        connection_cb=None,
+        connection_eb=None,
+    ):
         defers_list = []
 
         for candidate in candidates:
             delay = CANDIDATE_DELAY * len(defers_list)
             if candidate.type == XEP_0065.TYPE_PROXY:
                 delay += CANDIDATE_DELAY_PROXY
-            d = self.connectCandidate(client, candidate, session_hash, peer_session_hash, delay)
+            d = self.connectCandidate(
+                client, candidate, session_hash, peer_session_hash, delay
+            )
             if connection_cb is not None:
-                d.addCallback(lambda dummy, candidate=candidate, client=client: connection_cb(client, candidate))
+                d.addCallback(
+                    lambda dummy, candidate=candidate, client=client: connection_cb(
+                        client, candidate
+                    )
+                )
             if connection_eb is not None:
                 d.addErrback(connection_eb, client, candidate)
             defers_list.append(d)
@@ -942,9 +1055,11 @@
             if failure.check(defer.CancelledError):
                 log.debug(u"Connection of {} has been cancelled".format(candidate))
             else:
-                log.info(u"Connection of {candidate} Failed: {error}".format(
-                    candidate = candidate,
-                    error = failure.value))
+                log.info(
+                    u"Connection of {candidate} Failed: {error}".format(
+                        candidate=candidate, error=failure.value
+                    )
+                )
             candidates[candidates.index(candidate)] = None
 
         def allTested(self):
@@ -952,7 +1067,14 @@
             good_candidates = [c for c in candidates if c]
             return good_candidates[0] if good_candidates else None
 
-        defer_candidates = self.tryCandidates(client, candidates, session_hash, peer_session_hash, connectionCb, connectionEb)
+        defer_candidates = self.tryCandidates(
+            client,
+            candidates,
+            session_hash,
+            peer_session_hash,
+            connectionCb,
+            connectionEb,
+        )
         d_list = defer.DeferredList(defer_candidates)
         d_list.addCallback(allTested)
         return d_list
@@ -977,11 +1099,13 @@
         @param failure_(None, failure.Failure): None if eveything was fine, a failure else
         @return (None, failure.Failure): failure_ is returned
         """
-        log.debug(u'Cleaning session with hash {hash}{id}: {reason}'.format(
-            hash=session_hash,
-            reason='' if failure_ is None else failure_.value,
-            id='' if sid is None else u' (id: {})'.format(sid),
-            ))
+        log.debug(
+            u"Cleaning session with hash {hash}{id}: {reason}".format(
+                hash=session_hash,
+                reason="" if failure_ is None else failure_.value,
+                id="" if sid is None else u" (id: {})".format(sid),
+            )
+        )
 
         try:
             assert self.hash_clients_map[session_hash] == client
@@ -1004,7 +1128,7 @@
             del client._s5b_sessions[session_hash]
 
         try:
-            session_data['timer'].cancel()
+            session_data["timer"].cancel()
         except (internet_error.AlreadyCalled, internet_error.AlreadyCancelled):
             pass
 
@@ -1025,19 +1149,19 @@
         session_data[client] = client
 
         def gotCandidates(candidates):
-            session_data['candidates'] = candidates
+            session_data["candidates"] = candidates
             iq_elt = client.IQ()
             iq_elt["from"] = client.jid.full()
             iq_elt["to"] = to_jid.full()
-            query_elt = iq_elt.addElement((NS_BS, 'query'))
-            query_elt['mode'] = 'tcp'
-            query_elt['sid'] = sid
+            query_elt = iq_elt.addElement((NS_BS, "query"))
+            query_elt["mode"] = "tcp"
+            query_elt["sid"] = sid
 
             for candidate in candidates:
-                streamhost = query_elt.addElement('streamhost')
-                streamhost['host'] = candidate.host
-                streamhost['port'] = str(candidate.port)
-                streamhost['jid'] = candidate.jid.full()
+                streamhost = query_elt.addElement("streamhost")
+                streamhost["host"] = candidate.host
+                streamhost["port"] = str(candidate.port)
+                streamhost["jid"] = candidate.jid.full()
                 log.debug(u"Candidate proposed: {}".format(candidate))
 
             d = iq_elt.send()
@@ -1055,31 +1179,39 @@
         @param iq_elt(domish.Element): <iq> result
         """
         try:
-            query_elt = iq_elt.elements(NS_BS, 'query').next()
-            streamhost_used_elt = query_elt.elements(NS_BS, 'streamhost-used').next()
+            query_elt = iq_elt.elements(NS_BS, "query").next()
+            streamhost_used_elt = query_elt.elements(NS_BS, "streamhost-used").next()
         except StopIteration:
             log.warning(u"No streamhost found in stream query")
             # FIXME: must clean session
             return
 
-        streamhost_jid = jid.JID(streamhost_used_elt['jid'])
+        streamhost_jid = jid.JID(streamhost_used_elt["jid"])
         try:
-            candidate = (c for c in session_data['candidates'] if c.jid == streamhost_jid).next()
+            candidate = (
+                c for c in session_data["candidates"] if c.jid == streamhost_jid
+            ).next()
         except StopIteration:
-            log.warning(u"Candidate [{jid}] is unknown !".format(jid=streamhost_jid.full()))
+            log.warning(
+                u"Candidate [{jid}] is unknown !".format(jid=streamhost_jid.full())
+            )
             return
         else:
             log.info(u"Candidate choosed by target: {}".format(candidate))
 
         if candidate.type == XEP_0065.TYPE_PROXY:
             log.info(u"A Socks5 proxy is used")
-            d = self.connectCandidate(client, candidate, session_data['hash'])
-            d.addCallback(lambda dummy: candidate.activate(session_data['id'], session_data['peer_jid'], client))
+            d = self.connectCandidate(client, candidate, session_data["hash"])
+            d.addCallback(
+                lambda dummy: candidate.activate(
+                    session_data["id"], session_data["peer_jid"], client
+                )
+            )
             d.addErrback(self._activationEb)
         else:
             d = defer.succeed(None)
 
-        d.addCallback(lambda dummy: candidate.startTransfer(session_data['hash']))
+        d.addCallback(lambda dummy: candidate.startTransfer(session_data["hash"]))
 
     def _activationEb(self, failure):
         log.warning(u"Proxy activation error: {}".format(failure.value))
@@ -1105,7 +1237,7 @@
         @return (dict): session data
         """
         if sid in client.xep_0065_sid_session:
-            raise exceptions.ConflictError(u'A session with this id already exists !')
+            raise exceptions.ConflictError(u"A session with this id already exists !")
         if requester:
             session_hash = getSessionHash(client.jid, to_jid, sid)
             session_data = self._registerHash(client, session_hash, stream_object)
@@ -1115,15 +1247,19 @@
             session_d.addBoth(self.killSession, session_hash, sid, client)
             session_data = client._s5b_sessions[session_hash] = {
                 DEFER_KEY: session_d,
-                TIMER_KEY: reactor.callLater(TIMEOUT, self._timeOut, session_hash, client),
-                }
+                TIMER_KEY: reactor.callLater(
+                    TIMEOUT, self._timeOut, session_hash, client
+                ),
+            }
         client.xep_0065_sid_session[sid] = session_data
         session_data.update(
-            {'id': sid,
-             'peer_jid': to_jid,
-             'stream_object': stream_object,
-             'hash': session_hash,
-            })
+            {
+                "id": sid,
+                "peer_jid": to_jid,
+                "stream_object": stream_object,
+                "hash": session_hash,
+            }
+        )
 
         return session_data
 
@@ -1141,7 +1277,7 @@
         """
         if client is None:
             try:
-                client =  self.hash_clients_map[session_hash]
+                client = self.hash_clients_map[session_hash]
             except KeyError as e:
                 log.warning(u"The requested session doesn't exists !")
                 raise e
@@ -1167,10 +1303,10 @@
         session_data = client._s5b_sessions[session_hash] = {
             DEFER_KEY: session_d,
             TIMER_KEY: reactor.callLater(TIMEOUT, self._timeOut, session_hash, client),
-            }
+        }
 
         if stream_object is not None:
-            session_data['stream_object'] = stream_object
+            session_data["stream_object"] = stream_object
 
         assert session_hash not in self.hash_clients_map
         self.hash_clients_map[session_hash] = client
@@ -1180,22 +1316,22 @@
     def associateStreamObject(self, client, session_hash, stream_object):
         """Associate a stream object with  a session"""
         session_data = self.getSession(client, session_hash)
-        assert 'stream_object' not in session_data
-        session_data['stream_object'] = stream_object
+        assert "stream_object" not in session_data
+        session_data["stream_object"] = stream_object
 
     def streamQuery(self, iq_elt, client):
         log.debug(u"BS stream query")
 
         iq_elt.handled = True
 
-        query_elt = iq_elt.elements(NS_BS, 'query').next()
+        query_elt = iq_elt.elements(NS_BS, "query").next()
         try:
-            sid = query_elt['sid']
+            sid = query_elt["sid"]
         except KeyError:
             log.warning(u"Invalid bystreams request received")
             return client.sendError(iq_elt, "bad-request")
 
-        streamhost_elts = list(query_elt.elements(NS_BS, 'streamhost'))
+        streamhost_elts = list(query_elt.elements(NS_BS, "streamhost"))
         if not streamhost_elts:
             return client.sendError(iq_elt, "bad-request")
 
@@ -1203,7 +1339,7 @@
             session_data = client.xep_0065_sid_session[sid]
         except KeyError:
             log.warning(u"Ignoring unexpected BS transfer: {}".format(sid))
-            return client.sendError(iq_elt, 'not-acceptable')
+            return client.sendError(iq_elt, "not-acceptable")
 
         peer_jid = session_data["peer_jid"] = jid.JID(iq_elt["from"])
 
@@ -1211,7 +1347,7 @@
         nb_sh = len(streamhost_elts)
         for idx, sh_elt in enumerate(streamhost_elts):
             try:
-                host, port, jid_ = sh_elt['host'], sh_elt['port'], jid.JID(sh_elt['jid'])
+                host, port, jid_ = sh_elt["host"], sh_elt["port"], jid.JID(sh_elt["jid"])
             except KeyError:
                 log.warning(u"malformed streamhost element")
                 return client.sendError(iq_elt, "bad-request")
@@ -1225,19 +1361,19 @@
         for candidate in candidates:
             log.info(u"Candidate proposed: {}".format(candidate))
 
-        d = self.getBestCandidate(client, candidates, session_data['hash'])
+        d = self.getBestCandidate(client, candidates, session_data["hash"])
         d.addCallback(self._ackStream, iq_elt, session_data, client)
 
     def _ackStream(self, candidate, iq_elt, session_data, client):
         if candidate is None:
             log.info("No streamhost candidate worked, we have to end negotiation")
-            return client.sendError(iq_elt, 'item-not-found')
+            return client.sendError(iq_elt, "item-not-found")
         log.info(u"We choose: {}".format(candidate))
-        result_elt = xmlstream.toResponse(iq_elt, 'result')
-        query_elt = result_elt.addElement((NS_BS, 'query'))
-        query_elt['sid'] = session_data['id']
-        streamhost_used_elt = query_elt.addElement('streamhost-used')
-        streamhost_used_elt['jid'] = candidate.jid.full()
+        result_elt = xmlstream.toResponse(iq_elt, "result")
+        query_elt = result_elt.addElement((NS_BS, "query"))
+        query_elt["sid"] = session_data["id"]
+        streamhost_used_elt = query_elt.addElement("streamhost-used")
+        streamhost_used_elt["jid"] = candidate.jid.full()
         client.send(result_elt)
 
 
@@ -1249,10 +1385,12 @@
         self.host = plugin_parent.host
 
     def connectionInitialized(self):
-        self.xmlstream.addObserver(BS_REQUEST, self.plugin_parent.streamQuery, client=self.parent)
+        self.xmlstream.addObserver(
+            BS_REQUEST, self.plugin_parent.streamQuery, client=self.parent
+        )
 
-    def getDiscoInfo(self, requestor, target, nodeIdentifier=''):
+    def getDiscoInfo(self, requestor, target, nodeIdentifier=""):
         return [disco.DiscoFeature(NS_BS)]
 
-    def getDiscoItems(self, requestor, target, nodeIdentifier=''):
+    def getDiscoItems(self, requestor, target, nodeIdentifier=""):
         return []