Mercurial > libervia-backend
diff sat/plugins/plugin_xep_0065.py @ 2624:56f94936df1e
code style reformatting using black
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 27 Jun 2018 20:14:46 +0200 |
parents | 26edcf3a30eb |
children | 378188abe941 |
line wrap: on
line diff
--- a/sat/plugins/plugin_xep_0065.py Wed Jun 27 07:51:29 2018 +0200 +++ b/sat/plugins/plugin_xep_0065.py Wed Jun 27 20:14:46 2018 +0200 @@ -1,5 +1,5 @@ #!/usr/bin/env python2 -#-*- coding: utf-8 -*- +# -*- coding: utf-8 -*- # SAT plugin for managing xep-0065 @@ -56,6 +56,7 @@ from sat.core.i18n import _ from sat.core.log import getLogger + log = getLogger(__name__) from sat.core.constants import Const as C from sat.core import exceptions @@ -92,25 +93,27 @@ C.PI_RECOMMENDATIONS: ["NAT-PORT"], C.PI_MAIN: "XEP_0065", C.PI_HANDLER: "yes", - C.PI_DESCRIPTION: _("""Implementation of SOCKS5 Bytestreams""") + C.PI_DESCRIPTION: _("""Implementation of SOCKS5 Bytestreams"""), } IQ_SET = '/iq[@type="set"]' -NS_BS = 'http://jabber.org/protocol/bytestreams' +NS_BS = "http://jabber.org/protocol/bytestreams" BS_REQUEST = IQ_SET + '/query[@xmlns="' + NS_BS + '"]' -TIMER_KEY = 'timer' -DEFER_KEY = 'finished' # key of the deferred used to track session end -SERVER_STARTING_PORT = 0 # starting number for server port search (0 to ask automatic attribution) +TIMER_KEY = "timer" +DEFER_KEY = "finished" # key of the deferred used to track session end +SERVER_STARTING_PORT = ( + 0 +) # starting number for server port search (0 to ask automatic attribution) # priorities are candidates local priorities, must be a int between 0 and 65535 PRIORITY_BEST_DIRECT = 10000 PRIORITY_DIRECT = 5000 PRIORITY_ASSISTED = 1000 -PRIORITY_PROXY = 0.2 # proxy is the last option for s5b -CANDIDATE_DELAY = 0.2 # see XEP-0260 §4 -CANDIDATE_DELAY_PROXY = 0.2 # additional time for proxy types (see XEP-0260 §4 note 3) +PRIORITY_PROXY = 0.2 # proxy is the last option for s5b +CANDIDATE_DELAY = 0.2 # see XEP-0260 §4 +CANDIDATE_DELAY_PROXY = 0.2 # additional time for proxy types (see XEP-0260 §4 note 3) -TIMEOUT = 300 # maxium time between session creation and stream start +TIMEOUT = 300 # maxium time between session creation and stream start # XXX: by default eveything is automatic # TODO: use these params to force use of specific proxy/port/IP @@ -132,14 +135,15 @@ # </params> # """ -(STATE_INITIAL, -STATE_AUTH, -STATE_REQUEST, -STATE_READY, -STATE_AUTH_USERPASS, -STATE_CLIENT_INITIAL, -STATE_CLIENT_AUTH, -STATE_CLIENT_REQUEST, +( + STATE_INITIAL, + STATE_AUTH, + STATE_REQUEST, + STATE_READY, + STATE_AUTH_USERPASS, + STATE_CLIENT_INITIAL, + STATE_CLIENT_AUTH, + STATE_CLIENT_REQUEST, ) = xrange(8) SOCKS5_VER = 0x05 @@ -167,12 +171,21 @@ REPLY_ADDR_NOT_SUPPORTED = 0x08 -ProxyInfos = namedtuple("ProxyInfos", ['host', 'jid', 'port']) +ProxyInfos = namedtuple("ProxyInfos", ["host", "jid", "port"]) class Candidate(object): - - def __init__(self, host, port, type_, priority, jid_, id_=None, priority_local=False, factory=None): + def __init__( + self, + host, + port, + type_, + priority, + jid_, + id_=None, + priority_local=False, + factory=None, + ): """ @param host(unicode): host IP or domain @param port(int): port @@ -184,8 +197,7 @@ else priority is used as global one (and local priority is set to 0) """ assert isinstance(jid_, jid.JID) - self.host, self.port, self.type, self.jid = ( - host, int(port), type_, jid_) + self.host, self.port, self.type, self.jid = (host, int(port), type_, jid_) self.id = id_ if id_ is not None else unicode(uuid.uuid4()) if priority_local: self._local_priority = int(priority) @@ -204,7 +216,7 @@ try: self.factory.discard() except AttributeError: - pass # no discard for Socks5ServerFactory + pass # no discard for Socks5ServerFactory @property def local_priority(self): @@ -218,22 +230,25 @@ # similar to __unicode__ but we don't show jid and we encode id return "Candidate ({0.priority}): host={0.host} port={0.port} type={0.type}{id}".format( self, - id=u" id={}".format(self.id if self.id is not None else u'').encode('utf-8', 'ignore'), - ) + id=u" id={}".format(self.id if self.id is not None else u"").encode( + "utf-8", "ignore" + ), + ) def __unicode__(self): return u"Candidate ({0.priority}): host={0.host} port={0.port} jid={0.jid} type={0.type}{id}".format( - self, - id=u" id={}".format(self.id if self.id is not None else u''), - ) + self, id=u" id={}".format(self.id if self.id is not None else u"") + ) def __eq__(self, other): # self.id is is not used in __eq__ as the same candidate can have # different ids if proposed by initiator or responder try: - return (self.host == other.host and - self.port == other.port and - self.jid == other.jid) + return ( + self.host == other.host + and self.port == other.port + and self.jid == other.jid + ) except (AttributeError, TypeError): return False @@ -256,7 +271,7 @@ multiplier = 10 else: raise exceptions.InternalError(u"Unknown {} type !".format(self.type)) - return 2**16 * multiplier + self._local_priority + return 2 ** 16 * multiplier + self._local_priority def activate(self, sid, peer_jid, client): """Activate the proxy candidate @@ -269,15 +284,15 @@ """ assert self.type == XEP_0065.TYPE_PROXY iq_elt = client.IQ() - iq_elt['to'] = self.jid.full() - query_elt = iq_elt.addElement((NS_BS, 'query')) - query_elt['sid'] = sid - query_elt.addElement('activate', content=peer_jid.full()) + iq_elt["to"] = self.jid.full() + query_elt = iq_elt.addElement((NS_BS, "query")) + query_elt["sid"] = sid + query_elt.addElement("activate", content=peer_jid.full()) return iq_elt.send() def startTransfer(self, session_hash=None): if self.type == XEP_0065.TYPE_PROXY: - chunk_size = 4096 # Prosody's proxy reject bigger chunks by default + chunk_size = 4096 # Prosody's proxy reject bigger chunks by default else: chunk_size = None self.factory.startTransfer(session_hash, chunk_size=chunk_size) @@ -291,18 +306,20 @@ @param sid(unicode): session id @return (str): hash """ - return hashlib.sha1((sid + requester_jid.full() + target_jid.full()).encode('utf-8')).hexdigest() + return hashlib.sha1( + (sid + requester_jid.full() + target_jid.full()).encode("utf-8") + ).hexdigest() class SOCKSv5(protocol.Protocol): - CHUNK_SIZE = 2**16 + CHUNK_SIZE = 2 ** 16 def __init__(self, session_hash=None): """ @param session_hash(str): hash of the session must only be used in client mode """ - self.connection = defer.Deferred() # called when connection/auth is done + self.connection = defer.Deferred() # called when connection/auth is done if session_hash is not None: self.server_mode = False self._session_hash = session_hash @@ -318,13 +335,13 @@ self.addressType = 0 self.requestType = 0 self._stream_object = None - self.active = False # set to True when protocol is actually used for transfer - # used by factories to know when the finished Deferred can be triggered + self.active = False # set to True when protocol is actually used for transfer + # used by factories to know when the finished Deferred can be triggered @property def stream_object(self): if self._stream_object is None: - self._stream_object = self.getSession()['stream_object'] + self._stream_object = self.getSession()["stream_object"] if self.server_mode: self._stream_object.registerProducer(self.transport, True) return self._stream_object @@ -342,22 +359,22 @@ def _startNegotiation(self): log.debug("starting negotiation (client mode)") self.state = STATE_CLIENT_AUTH - self.transport.write(struct.pack('!3B', SOCKS5_VER, 1, AUTHMECH_ANON)) + self.transport.write(struct.pack("!3B", SOCKS5_VER, 1, AUTHMECH_ANON)) def _parseNegotiation(self): try: # Parse out data - ver, nmethod = struct.unpack('!BB', self.buf[:2]) - methods = struct.unpack('%dB' % nmethod, self.buf[2:nmethod + 2]) + ver, nmethod = struct.unpack("!BB", self.buf[:2]) + methods = struct.unpack("%dB" % nmethod, self.buf[2 : nmethod + 2]) # Ensure version is correct if ver != 5: - self.transport.write(struct.pack('!BB', SOCKS5_VER, AUTHMECH_INVALID)) + self.transport.write(struct.pack("!BB", SOCKS5_VER, AUTHMECH_INVALID)) self.transport.loseConnection() return # Trim off front of the buffer - self.buf = self.buf[nmethod + 2:] + self.buf = self.buf[nmethod + 2 :] # Check for supported auth mechs for m in self.supportedAuthMechs: @@ -368,12 +385,12 @@ elif m == AUTHMECH_USERPASS: self.state = STATE_AUTH_USERPASS # Complete negotiation w/ this method - self.transport.write(struct.pack('!BB', SOCKS5_VER, m)) + self.transport.write(struct.pack("!BB", SOCKS5_VER, m)) return # No supported mechs found, notify client and close the connection log.warning(u"Unsupported authentication mechanism") - self.transport.write(struct.pack('!BB', SOCKS5_VER, AUTHMECH_INVALID)) + self.transport.write(struct.pack("!BB", SOCKS5_VER, AUTHMECH_INVALID)) self.transport.loseConnection() except struct.error: pass @@ -381,34 +398,34 @@ def _parseUserPass(self): try: # Parse out data - ver, ulen = struct.unpack('BB', self.buf[:2]) - uname, = struct.unpack('%ds' % ulen, self.buf[2:ulen + 2]) - plen, = struct.unpack('B', self.buf[ulen + 2]) - password, = struct.unpack('%ds' % plen, self.buf[ulen + 3:ulen + 3 + plen]) + ver, ulen = struct.unpack("BB", self.buf[:2]) + uname, = struct.unpack("%ds" % ulen, self.buf[2 : ulen + 2]) + plen, = struct.unpack("B", self.buf[ulen + 2]) + password, = struct.unpack("%ds" % plen, self.buf[ulen + 3 : ulen + 3 + plen]) # Trim off fron of the buffer - self.buf = self.buf[3 + ulen + plen:] + self.buf = self.buf[3 + ulen + plen :] # Fire event to authenticate user if self.authenticateUserPass(uname, password): # Signal success self.state = STATE_REQUEST - self.transport.write(struct.pack('!BB', SOCKS5_VER, 0x00)) + self.transport.write(struct.pack("!BB", SOCKS5_VER, 0x00)) else: # Signal failure - self.transport.write(struct.pack('!BB', SOCKS5_VER, 0x01)) + self.transport.write(struct.pack("!BB", SOCKS5_VER, 0x01)) self.transport.loseConnection() except struct.error: pass def sendErrorReply(self, errorcode): # Any other address types are not supported - result = struct.pack('!BBBBIH', SOCKS5_VER, errorcode, 0, 1, 0, 0) + result = struct.pack("!BBBBIH", SOCKS5_VER, errorcode, 0, 1, 0, 0) self.transport.write(result) self.transport.loseConnection() def _parseRequest(self): try: # Parse out data and trim buffer accordingly - ver, cmd, rsvd, self.addressType = struct.unpack('!BBBB', self.buf[:4]) + ver, cmd, rsvd, self.addressType = struct.unpack("!BBBB", self.buf[:4]) # Ensure we actually support the requested address type if self.addressType not in self.supportedAddrs: @@ -417,12 +434,12 @@ # Deal with addresses if self.addressType == ADDR_IPV4: - addr, port = struct.unpack('!IH', self.buf[4:10]) + addr, port = struct.unpack("!IH", self.buf[4:10]) self.buf = self.buf[10:] elif self.addressType == ADDR_DOMAINNAME: nlen = ord(self.buf[4]) - addr, port = struct.unpack('!%dsH' % nlen, self.buf[5:]) - self.buf = self.buf[7 + len(addr):] + addr, port = struct.unpack("!%dsH" % nlen, self.buf[5:]) + self.buf = self.buf[7 + len(addr) :] else: # Any other address types are not supported self.sendErrorReply(REPLY_ADDR_NOT_SUPPORTED) @@ -449,13 +466,22 @@ def _makeRequest(self): hash_ = self._session_hash - request = struct.pack('!5B%dsH' % len(hash_), SOCKS5_VER, CMD_CONNECT, 0, ADDR_DOMAINNAME, len(hash_), hash_, 0) + request = struct.pack( + "!5B%dsH" % len(hash_), + SOCKS5_VER, + CMD_CONNECT, + 0, + ADDR_DOMAINNAME, + len(hash_), + hash_, + 0, + ) self.transport.write(request) self.state = STATE_CLIENT_REQUEST def _parseRequestReply(self): try: - ver, rep, rsvd, self.addressType = struct.unpack('!BBBB', self.buf[:4]) + ver, rep, rsvd, self.addressType = struct.unpack("!BBBB", self.buf[:4]) # Ensure we actually support the requested address type if self.addressType not in self.supportedAddrs: self.sendErrorReply(REPLY_ADDR_NOT_SUPPORTED) @@ -463,12 +489,12 @@ # Deal with addresses if self.addressType == ADDR_IPV4: - addr, port = struct.unpack('!IH', self.buf[4:10]) + addr, port = struct.unpack("!IH", self.buf[4:10]) self.buf = self.buf[10:] elif self.addressType == ADDR_DOMAINNAME: nlen = ord(self.buf[4]) - addr, port = struct.unpack('!%dsH' % nlen, self.buf[5:]) - self.buf = self.buf[7 + len(addr):] + addr, port = struct.unpack("!%dsH" % nlen, self.buf[5:]) + self.buf = self.buf[7 + len(addr) :] else: # Any other address types are not supported self.sendErrorReply(REPLY_ADDR_NOT_SUPPORTED) @@ -487,7 +513,11 @@ return None def connectionMade(self): - log.debug(u"Socks5 connectionMade (mode = {})".format("server" if self.state == STATE_INITIAL else "client")) + log.debug( + u"Socks5 connectionMade (mode = {})".format( + "server" if self.state == STATE_INITIAL else "client" + ) + ) if self.state == STATE_CLIENT_INITIAL: self._startNegotiation() @@ -495,8 +525,11 @@ # Check that this session is expected if not self.factory.addToSession(addr, self): self.sendErrorReply(REPLY_CONN_REFUSED) - log.warning(u"Unexpected connection request received from {host}" - .format(host=self.transport.getPeer().host)) + log.warning( + u"Unexpected connection request received from {host}".format( + host=self.transport.getPeer().host + ) + ) return self._session_hash = addr self.connectCompleted(addr, 0) @@ -519,10 +552,20 @@ def connectCompleted(self, remotehost, remoteport): if self.addressType == ADDR_IPV4: - result = struct.pack('!BBBBIH', SOCKS5_VER, REPLY_SUCCESS, 0, 1, remotehost, remoteport) + result = struct.pack( + "!BBBBIH", SOCKS5_VER, REPLY_SUCCESS, 0, 1, remotehost, remoteport + ) elif self.addressType == ADDR_DOMAINNAME: - result = struct.pack('!BBBBB%dsH' % len(remotehost), SOCKS5_VER, REPLY_SUCCESS, 0, - ADDR_DOMAINNAME, len(remotehost), remotehost, remoteport) + result = struct.pack( + "!BBBBB%dsH" % len(remotehost), + SOCKS5_VER, + REPLY_SUCCESS, + 0, + ADDR_DOMAINNAME, + len(remotehost), + remotehost, + remoteport, + ) self.transport.write(result) self.state = STATE_READY @@ -553,7 +596,7 @@ if self.state == STATE_CLIENT_REQUEST: self._parseRequestReply() if self.state == STATE_CLIENT_AUTH: - ver, method = struct.unpack('!BB', buf) + ver, method = struct.unpack("!BB", buf) self.buf = self.buf[2:] if ver != SOCKS5_VER or method != AUTHMECH_ANON: self.transport.loseConnection() @@ -564,7 +607,7 @@ log.debug(u"Socks5 connection lost: {}".format(reason.value)) if self.state != STATE_READY: self.connection.errback(reason) - if self.server_mode : + if self.server_mode: self.factory.removeFromSession(self._session_hash, self, reason) @@ -583,7 +626,7 @@ def startTransfer(self, session_hash, chunk_size=None): session = self.getSession(session_hash) try: - protocol = session['protocols'][0] + protocol = session["protocols"][0] except (KeyError, IndexError): log.error(u"Can't start file transfer, can't find protocol") else: @@ -603,7 +646,7 @@ except KeyError: return False else: - session_data.setdefault('protocols', []).append(protocol) + session_data.setdefault("protocols", []).append(protocol) return True def removeFromSession(self, session_hash, protocol, reason): @@ -616,7 +659,7 @@ @param reason(failure.Failure): reason of the removal """ try: - protocols = self.getSession(session_hash)['protocols'] + protocols = self.getSession(session_hash)["protocols"] protocols.remove(protocol) except (KeyError, ValueError): log.error(u"Protocol not found in session while it should be there") @@ -686,10 +729,10 @@ class XEP_0065(object): NAMESPACE = NS_BS - TYPE_DIRECT = 'direct' - TYPE_ASSISTED = 'assisted' - TYPE_TUNEL = 'tunel' - TYPE_PROXY = 'proxy' + TYPE_DIRECT = "direct" + TYPE_ASSISTED = "assisted" + TYPE_TUNEL = "tunel" + TYPE_PROXY = "proxy" Candidate = Candidate def __init__(self, host): @@ -698,16 +741,16 @@ # session data self.hash_clients_map = {} # key: hash of the transfer session, value: session data - self._cache_proxies = {} # key: server jid, value: proxy data + self._cache_proxies = {} # key: server jid, value: proxy data # misc data self._server_factory = None self._external_port = None # plugins shortcuts - self._ip = self.host.plugins['IP'] + self._ip = self.host.plugins["IP"] try: - self._np = self.host.plugins['NAT-PORT'] + self._np = self.host.plugins["NAT-PORT"] except KeyError: log.debug(u"NAT Port plugin not available") self._np = None @@ -739,16 +782,22 @@ try: listening_port = reactor.listenTCP(port, self._server_factory) except internet_error.CannotListenError as e: - log.debug(u"Cannot listen on port {port}: {err_msg}{err_num}".format( - port=port, - err_msg=e.socketError.strerror, - err_num=u' (error code: {})'.format(e.socketError.errno), - )) + log.debug( + u"Cannot listen on port {port}: {err_msg}{err_num}".format( + port=port, + err_msg=e.socketError.strerror, + err_num=u" (error code: {})".format(e.socketError.errno), + ) + ) else: self._server_factory_port = listening_port.getHost().port break - log.info(_("Socks5 Stream server launched on port {}").format(self._server_factory_port)) + log.info( + _("Socks5 Stream server launched on port {}").format( + self._server_factory_port + ) + ) return self._server_factory @defer.inlineCallbacks @@ -759,36 +808,43 @@ @return ((D)(ProxyInfos, None)): Found proxy infos, or None if not acceptable proxy is found """ + def notFound(server): log.info(u"No proxy found on this server") self._cache_proxies[server] = None defer.returnValue(None) + server = client.jid.host try: defer.returnValue(self._cache_proxies[server]) except KeyError: pass try: - proxy = (yield self.host.findServiceEntities(client, 'proxy', 'bytestreams')).pop() + proxy = ( + yield self.host.findServiceEntities(client, "proxy", "bytestreams") + ).pop() except (defer.CancelledError, StopIteration, KeyError): notFound(server) - iq_elt = client.IQ('get') - iq_elt['to'] = proxy.full() - iq_elt.addElement((NS_BS, 'query')) + iq_elt = client.IQ("get") + iq_elt["to"] = proxy.full() + iq_elt.addElement((NS_BS, "query")) try: result_elt = yield iq_elt.send() except jabber_error.StanzaError as failure: - log.warning(u"Error while requesting proxy info on {jid}: {error}" - .format(proxy.full(), failure)) + log.warning( + u"Error while requesting proxy info on {jid}: {error}".format( + proxy.full(), failure + ) + ) notFound(server) try: - query_elt = result_elt.elements(NS_BS, 'query').next() - streamhost_elt = query_elt.elements(NS_BS, 'streamhost').next() - host = streamhost_elt['host'] - jid_ = streamhost_elt['jid'] - port = streamhost_elt['port'] + query_elt = result_elt.elements(NS_BS, "query").next() + streamhost_elt = query_elt.elements(NS_BS, "streamhost").next() + host = streamhost_elt["host"] + jid_ = streamhost_elt["jid"] + port = streamhost_elt["port"] if not all((host, jid, port)): raise KeyError jid_ = jid.JID(jid_) @@ -818,7 +874,9 @@ if self._np is None: log.warning(u"NAT port plugin not available, we can't map port") else: - ext_port = yield self._np.mapPort(local_port, desc=u"SaT socks5 stream") + ext_port = yield self._np.mapPort( + local_port, desc=u"SaT socks5 stream" + ) if ext_port is None: log.warning(u"Can't map NAT port") else: @@ -843,17 +901,56 @@ # the preferred direct connection ip = local_ips.pop(0) - candidates.append(Candidate(ip, local_port, XEP_0065.TYPE_DIRECT, PRIORITY_BEST_DIRECT, client.jid, priority_local=True, factory=server_factory)) + candidates.append( + Candidate( + ip, + local_port, + XEP_0065.TYPE_DIRECT, + PRIORITY_BEST_DIRECT, + client.jid, + priority_local=True, + factory=server_factory, + ) + ) for ip in local_ips: - candidates.append(Candidate(ip, local_port, XEP_0065.TYPE_DIRECT, PRIORITY_DIRECT, client.jid, priority_local=True, factory=server_factory)) + candidates.append( + Candidate( + ip, + local_port, + XEP_0065.TYPE_DIRECT, + PRIORITY_DIRECT, + client.jid, + priority_local=True, + factory=server_factory, + ) + ) # then the assisted one if ext_port is not None: - candidates.append(Candidate(external_ip, ext_port, XEP_0065.TYPE_ASSISTED, PRIORITY_ASSISTED, client.jid, priority_local=True, factory=server_factory)) + candidates.append( + Candidate( + external_ip, + ext_port, + XEP_0065.TYPE_ASSISTED, + PRIORITY_ASSISTED, + client.jid, + priority_local=True, + factory=server_factory, + ) + ) # finally the proxy if proxy: - candidates.append(Candidate(proxy.host, proxy.port, XEP_0065.TYPE_PROXY, PRIORITY_PROXY, proxy.jid, priority_local=True)) + candidates.append( + Candidate( + proxy.host, + proxy.port, + XEP_0065.TYPE_PROXY, + PRIORITY_PROXY, + proxy.jid, + priority_local=True, + ) + ) # should be already sorted, but just in case the priorities get weird candidates.sort(key=lambda c: c.priority, reverse=True) @@ -870,7 +967,9 @@ candidate.factory.connector = connector return candidate.factory.connection - def connectCandidate(self, client, candidate, session_hash, peer_session_hash=None, delay=None): + def connectCandidate( + self, client, candidate, session_hash, peer_session_hash=None, delay=None + ): """Connect to a candidate Connection will be done with a Socks5ClientFactory @@ -900,16 +999,30 @@ d.addCallback(self._addConnector, candidate) return d - def tryCandidates(self, client, candidates, session_hash, peer_session_hash, connection_cb=None, connection_eb=None): + def tryCandidates( + self, + client, + candidates, + session_hash, + peer_session_hash, + connection_cb=None, + connection_eb=None, + ): defers_list = [] for candidate in candidates: delay = CANDIDATE_DELAY * len(defers_list) if candidate.type == XEP_0065.TYPE_PROXY: delay += CANDIDATE_DELAY_PROXY - d = self.connectCandidate(client, candidate, session_hash, peer_session_hash, delay) + d = self.connectCandidate( + client, candidate, session_hash, peer_session_hash, delay + ) if connection_cb is not None: - d.addCallback(lambda dummy, candidate=candidate, client=client: connection_cb(client, candidate)) + d.addCallback( + lambda dummy, candidate=candidate, client=client: connection_cb( + client, candidate + ) + ) if connection_eb is not None: d.addErrback(connection_eb, client, candidate) defers_list.append(d) @@ -942,9 +1055,11 @@ if failure.check(defer.CancelledError): log.debug(u"Connection of {} has been cancelled".format(candidate)) else: - log.info(u"Connection of {candidate} Failed: {error}".format( - candidate = candidate, - error = failure.value)) + log.info( + u"Connection of {candidate} Failed: {error}".format( + candidate=candidate, error=failure.value + ) + ) candidates[candidates.index(candidate)] = None def allTested(self): @@ -952,7 +1067,14 @@ good_candidates = [c for c in candidates if c] return good_candidates[0] if good_candidates else None - defer_candidates = self.tryCandidates(client, candidates, session_hash, peer_session_hash, connectionCb, connectionEb) + defer_candidates = self.tryCandidates( + client, + candidates, + session_hash, + peer_session_hash, + connectionCb, + connectionEb, + ) d_list = defer.DeferredList(defer_candidates) d_list.addCallback(allTested) return d_list @@ -977,11 +1099,13 @@ @param failure_(None, failure.Failure): None if eveything was fine, a failure else @return (None, failure.Failure): failure_ is returned """ - log.debug(u'Cleaning session with hash {hash}{id}: {reason}'.format( - hash=session_hash, - reason='' if failure_ is None else failure_.value, - id='' if sid is None else u' (id: {})'.format(sid), - )) + log.debug( + u"Cleaning session with hash {hash}{id}: {reason}".format( + hash=session_hash, + reason="" if failure_ is None else failure_.value, + id="" if sid is None else u" (id: {})".format(sid), + ) + ) try: assert self.hash_clients_map[session_hash] == client @@ -1004,7 +1128,7 @@ del client._s5b_sessions[session_hash] try: - session_data['timer'].cancel() + session_data["timer"].cancel() except (internet_error.AlreadyCalled, internet_error.AlreadyCancelled): pass @@ -1025,19 +1149,19 @@ session_data[client] = client def gotCandidates(candidates): - session_data['candidates'] = candidates + session_data["candidates"] = candidates iq_elt = client.IQ() iq_elt["from"] = client.jid.full() iq_elt["to"] = to_jid.full() - query_elt = iq_elt.addElement((NS_BS, 'query')) - query_elt['mode'] = 'tcp' - query_elt['sid'] = sid + query_elt = iq_elt.addElement((NS_BS, "query")) + query_elt["mode"] = "tcp" + query_elt["sid"] = sid for candidate in candidates: - streamhost = query_elt.addElement('streamhost') - streamhost['host'] = candidate.host - streamhost['port'] = str(candidate.port) - streamhost['jid'] = candidate.jid.full() + streamhost = query_elt.addElement("streamhost") + streamhost["host"] = candidate.host + streamhost["port"] = str(candidate.port) + streamhost["jid"] = candidate.jid.full() log.debug(u"Candidate proposed: {}".format(candidate)) d = iq_elt.send() @@ -1055,31 +1179,39 @@ @param iq_elt(domish.Element): <iq> result """ try: - query_elt = iq_elt.elements(NS_BS, 'query').next() - streamhost_used_elt = query_elt.elements(NS_BS, 'streamhost-used').next() + query_elt = iq_elt.elements(NS_BS, "query").next() + streamhost_used_elt = query_elt.elements(NS_BS, "streamhost-used").next() except StopIteration: log.warning(u"No streamhost found in stream query") # FIXME: must clean session return - streamhost_jid = jid.JID(streamhost_used_elt['jid']) + streamhost_jid = jid.JID(streamhost_used_elt["jid"]) try: - candidate = (c for c in session_data['candidates'] if c.jid == streamhost_jid).next() + candidate = ( + c for c in session_data["candidates"] if c.jid == streamhost_jid + ).next() except StopIteration: - log.warning(u"Candidate [{jid}] is unknown !".format(jid=streamhost_jid.full())) + log.warning( + u"Candidate [{jid}] is unknown !".format(jid=streamhost_jid.full()) + ) return else: log.info(u"Candidate choosed by target: {}".format(candidate)) if candidate.type == XEP_0065.TYPE_PROXY: log.info(u"A Socks5 proxy is used") - d = self.connectCandidate(client, candidate, session_data['hash']) - d.addCallback(lambda dummy: candidate.activate(session_data['id'], session_data['peer_jid'], client)) + d = self.connectCandidate(client, candidate, session_data["hash"]) + d.addCallback( + lambda dummy: candidate.activate( + session_data["id"], session_data["peer_jid"], client + ) + ) d.addErrback(self._activationEb) else: d = defer.succeed(None) - d.addCallback(lambda dummy: candidate.startTransfer(session_data['hash'])) + d.addCallback(lambda dummy: candidate.startTransfer(session_data["hash"])) def _activationEb(self, failure): log.warning(u"Proxy activation error: {}".format(failure.value)) @@ -1105,7 +1237,7 @@ @return (dict): session data """ if sid in client.xep_0065_sid_session: - raise exceptions.ConflictError(u'A session with this id already exists !') + raise exceptions.ConflictError(u"A session with this id already exists !") if requester: session_hash = getSessionHash(client.jid, to_jid, sid) session_data = self._registerHash(client, session_hash, stream_object) @@ -1115,15 +1247,19 @@ session_d.addBoth(self.killSession, session_hash, sid, client) session_data = client._s5b_sessions[session_hash] = { DEFER_KEY: session_d, - TIMER_KEY: reactor.callLater(TIMEOUT, self._timeOut, session_hash, client), - } + TIMER_KEY: reactor.callLater( + TIMEOUT, self._timeOut, session_hash, client + ), + } client.xep_0065_sid_session[sid] = session_data session_data.update( - {'id': sid, - 'peer_jid': to_jid, - 'stream_object': stream_object, - 'hash': session_hash, - }) + { + "id": sid, + "peer_jid": to_jid, + "stream_object": stream_object, + "hash": session_hash, + } + ) return session_data @@ -1141,7 +1277,7 @@ """ if client is None: try: - client = self.hash_clients_map[session_hash] + client = self.hash_clients_map[session_hash] except KeyError as e: log.warning(u"The requested session doesn't exists !") raise e @@ -1167,10 +1303,10 @@ session_data = client._s5b_sessions[session_hash] = { DEFER_KEY: session_d, TIMER_KEY: reactor.callLater(TIMEOUT, self._timeOut, session_hash, client), - } + } if stream_object is not None: - session_data['stream_object'] = stream_object + session_data["stream_object"] = stream_object assert session_hash not in self.hash_clients_map self.hash_clients_map[session_hash] = client @@ -1180,22 +1316,22 @@ def associateStreamObject(self, client, session_hash, stream_object): """Associate a stream object with a session""" session_data = self.getSession(client, session_hash) - assert 'stream_object' not in session_data - session_data['stream_object'] = stream_object + assert "stream_object" not in session_data + session_data["stream_object"] = stream_object def streamQuery(self, iq_elt, client): log.debug(u"BS stream query") iq_elt.handled = True - query_elt = iq_elt.elements(NS_BS, 'query').next() + query_elt = iq_elt.elements(NS_BS, "query").next() try: - sid = query_elt['sid'] + sid = query_elt["sid"] except KeyError: log.warning(u"Invalid bystreams request received") return client.sendError(iq_elt, "bad-request") - streamhost_elts = list(query_elt.elements(NS_BS, 'streamhost')) + streamhost_elts = list(query_elt.elements(NS_BS, "streamhost")) if not streamhost_elts: return client.sendError(iq_elt, "bad-request") @@ -1203,7 +1339,7 @@ session_data = client.xep_0065_sid_session[sid] except KeyError: log.warning(u"Ignoring unexpected BS transfer: {}".format(sid)) - return client.sendError(iq_elt, 'not-acceptable') + return client.sendError(iq_elt, "not-acceptable") peer_jid = session_data["peer_jid"] = jid.JID(iq_elt["from"]) @@ -1211,7 +1347,7 @@ nb_sh = len(streamhost_elts) for idx, sh_elt in enumerate(streamhost_elts): try: - host, port, jid_ = sh_elt['host'], sh_elt['port'], jid.JID(sh_elt['jid']) + host, port, jid_ = sh_elt["host"], sh_elt["port"], jid.JID(sh_elt["jid"]) except KeyError: log.warning(u"malformed streamhost element") return client.sendError(iq_elt, "bad-request") @@ -1225,19 +1361,19 @@ for candidate in candidates: log.info(u"Candidate proposed: {}".format(candidate)) - d = self.getBestCandidate(client, candidates, session_data['hash']) + d = self.getBestCandidate(client, candidates, session_data["hash"]) d.addCallback(self._ackStream, iq_elt, session_data, client) def _ackStream(self, candidate, iq_elt, session_data, client): if candidate is None: log.info("No streamhost candidate worked, we have to end negotiation") - return client.sendError(iq_elt, 'item-not-found') + return client.sendError(iq_elt, "item-not-found") log.info(u"We choose: {}".format(candidate)) - result_elt = xmlstream.toResponse(iq_elt, 'result') - query_elt = result_elt.addElement((NS_BS, 'query')) - query_elt['sid'] = session_data['id'] - streamhost_used_elt = query_elt.addElement('streamhost-used') - streamhost_used_elt['jid'] = candidate.jid.full() + result_elt = xmlstream.toResponse(iq_elt, "result") + query_elt = result_elt.addElement((NS_BS, "query")) + query_elt["sid"] = session_data["id"] + streamhost_used_elt = query_elt.addElement("streamhost-used") + streamhost_used_elt["jid"] = candidate.jid.full() client.send(result_elt) @@ -1249,10 +1385,12 @@ self.host = plugin_parent.host def connectionInitialized(self): - self.xmlstream.addObserver(BS_REQUEST, self.plugin_parent.streamQuery, client=self.parent) + self.xmlstream.addObserver( + BS_REQUEST, self.plugin_parent.streamQuery, client=self.parent + ) - def getDiscoInfo(self, requestor, target, nodeIdentifier=''): + def getDiscoInfo(self, requestor, target, nodeIdentifier=""): return [disco.DiscoFeature(NS_BS)] - def getDiscoItems(self, requestor, target, nodeIdentifier=''): + def getDiscoItems(self, requestor, target, nodeIdentifier=""): return []