Mercurial > libervia-backend
diff src/plugins/plugin_xep_0065.py @ 1559:7cc29634b6ef
plugin XEP-0065, XEP-0096: preparation for plugin XEP-0260 implementation:
/!\ SI File Transfert (plugin XEP-0096) is temporarily broken
/!\ proxy handling is temporarily broken
plugin XEP-0096: use of Deferred for plugin XEP-0065 in the same way as for plugin XEP-0047
plugin XEP-0065:
- use of Deferred for sessions
- plugin IP is a dependency
- plugin NAT-PORT is used if available
- everything is now automatic, params are disabled for now (may be re-used in the future to force port or proxy)
- proxy infos are managed with a namedtuple
- connexion candidates are managed with a dedicate class
- priorities can be used for candidates, as needed for XEP-0260
- transfer can now be managed in both direction, with client or server
- socks5 server is launcher on demand, once for all profiles
- helper methods to try and find best candidate
- connection test and file transfer are done in 2 times
author | Goffi <goffi@goffi.org> |
---|---|
date | Mon, 02 Nov 2015 22:02:41 +0100 |
parents | 3265a2639182 |
children | 44854fb5d3b2 |
line wrap: on
line diff
--- a/src/plugins/plugin_xep_0065.py Mon Nov 02 22:02:41 2015 +0100 +++ b/src/plugins/plugin_xep_0065.py Mon Nov 02 22:02:41 2015 +0100 @@ -23,7 +23,7 @@ # -- -# This program is based on proxy65 (http://code.google.com/p/proxy65), +# This module is based on proxy65 (http://code.google.com/p/proxy65), # originaly written by David Smith and modified by Fabio Forno. # It is sublicensed under AGPL v3 (or any later version) as allowed by the original # license. @@ -57,15 +57,23 @@ from sat.core.i18n import _ from sat.core.log import getLogger log = getLogger(__name__) -from twisted.internet import protocol, reactor -from twisted.internet import error +from sat.core.constants import Const as C +from sat.core import exceptions +from sat.tools import sat_defer +from twisted.internet import protocol +from twisted.internet import reactor +from twisted.internet import error as internet_error from twisted.words.protocols.jabber import jid, client as jabber_client +from twisted.words.protocols.jabber import error as jabber_error from twisted.protocols.basic import FileSender from twisted.words.xish import domish -from twisted.web.client import getPage +from twisted.internet import defer +from twisted.python import failure from sat.core.exceptions import ProfileNotInCacheError +from collections import namedtuple import struct import hashlib +import uuid from zope.interface import implements @@ -80,29 +88,58 @@ NS_BS = 'http://jabber.org/protocol/bytestreams' BS_REQUEST = IQ_SET + '/query[@xmlns="' + NS_BS + '"]' TIMEOUT = 60 # timeout for workflow +DEFER_KEY = 'finished' # key of the deferred used to track session end +SERVER_STARTING_PORT = 0 # starting number for server port search (0 to ask automatic attribution) + +# priorities are candidates local priorities, must be a int between 0 and 65535 +PRIORITY_BEST_DIRECT = 10000 +PRIORITY_DIRECT = 5000 +PRIORITY_ASSISTED = 1000 +PRIORITY_PROXY = 0.2 # proxy is the last option for s5b +CANDIDATE_DELAY = 0.2 # see XEP-0260 §4 +CANDIDATE_DELAY_PROXY = 0.2 # additional time for proxy types (see XEP-0260 §4 note 3) PLUGIN_INFO = { "name": "XEP 0065 Plugin", "import_name": "XEP-0065", "type": "XEP", "protocols": ["XEP-0065"], + "dependencies": ["IP"], + "recommendations": ["NAT-PORT"], "main": "XEP_0065", "handler": "yes", "description": _("""Implementation of SOCKS5 Bytestreams""") } -STATE_INITIAL = 0 -STATE_AUTH = 1 -STATE_REQUEST = 2 -STATE_READY = 3 -STATE_AUTH_USERPASS = 4 -STATE_TARGET_INITIAL = 5 -STATE_TARGET_AUTH = 6 -STATE_TARGET_REQUEST = 7 -STATE_TARGET_READY = 8 -STATE_LAST = 9 +# XXX: by default eveything is automatic +# TODO: use these params to force use of specific proxy/port/IP +# PARAMS = """ +# <params> +# <general> +# <category name="File Transfer"> +# <param name="Force IP" type="string" /> +# <param name="Force Port" type="int" constraint="1;65535" /> +# </category> +# </general> +# <individual> +# <category name="File Transfer"> +# <param name="Force Proxy" value="" type="string" /> +# <param name="Force Proxy host" value="" type="string" /> +# <param name="Force Proxy port" value="" type="int" constraint="1;65535" /> +# </category> +# </individual> +# </params> +# """ -STATE_CONNECT_PENDING = STATE_LAST + 1 +(STATE_INITIAL, +STATE_AUTH, +STATE_REQUEST, +STATE_READY, +STATE_AUTH_USERPASS, +STATE_CLIENT_INITIAL, +STATE_CLIENT_AUTH, +STATE_CLIENT_REQUEST, +) = xrange(8) SOCKS5_VER = 0x05 @@ -129,19 +166,128 @@ REPLY_ADDR_NOT_SUPPORTED = 0x08 -def calculateHash(from_jid, to_jid, sid): - """Calculate SHA1 Hash according to XEP-0065 - @param from_jid: jid of the requester - @param to_jid: jid of the target - @param sid: session id - @return: hash (string)""" +ProxyInfos = namedtuple("ProxyInfos", ['host', 'jid', 'port']) + + +class Candidate(object): + + def __init__(self, host, port, type_, priority, jid_, id_=None, priority_local=False, factory=None): + """ + @param host(unicode): host IP or domain + @param port(int): port + @param type_(unicode): stream type (one of XEP_0065.TYPE_*) + @param priority(int): priority + @param jid_(jid.JID): jid + @param id_(None, id_): Candidate ID, or None to generate + @param priority_local(bool): if True, priority is used as local priority, + else priority is used as global one (and local priority is set to 0) + """ + assert isinstance(jid_, jid.JID) + self.host, self.port, self.type, self.jid = ( + host, int(port), type_, jid_) + self.id = id_ if id_ is not None else unicode(uuid.uuid4()) + if priority_local: + self._local_priority = int(priority) + self._priority = self.calculatePriority() + else: + self._local_priority = 0 + self._priority = int(priority) + self.factory = factory + + def discard(self): + """Disconnect a candidate if it is connected + + Used to disconnect tryed client when they are discarded + """ + log.debug(u"Discarding {}".format(self)) + try: + self.factory.discard() + except AttributeError: + pass # no discard for Socks5ServerFactory + + @property + def local_priority(self): + return self._local_priority + + @property + def priority(self): + return self._priority + + def __str__(self): + # similar to __unicode__ but we don't show jid and we encode id + return "Candidate ({0.priority}): host={0.host} port={0.port} type={0.type}{id}".format( + self, + id=u" id={}".format(self.id if self.id is not None else u'').encode('utf-8', 'ignore'), + ) + + def __unicode__(self): + return u"Candidate ({0.priority}): host={0.host} port={0.port} jid={0.jid} type={0.type}{id}".format( + self, + id=u" id={}".format(self.id if self.id is not None else u''), + ) + + def __eq__(self, other): + # self.id is is not used in __eq__ as the same candidate can have + # different ids if proposed by initiator or responder + try: + return (self.host == other.host and + self.port == other.port and + self.jid == other.jid) + except (AttributeError, TypeError): + return False + + def __ne__(self, other): + return not self.__eq__(other) + + def calculatePriority(self): + """Calculate candidate priority according to XEP-0260 §2.2 + + + @return (int): priority + """ + if self.type == XEP_0065.TYPE_DIRECT: + multiplier = 126 + elif self.type == XEP_0065.TYPE_ASSISTED: + multiplier = 120 + elif self.type == XEP_0065.TYPE_TUNEL: + multiplier = 110 + elif self.type == XEP_0065.TYPE_PROXY: + multiplier = 10 + else: + raise exceptions.InternalError(u"Unknown {} type !".format(self.type)) + return 2**16 * multiplier + self._local_priority + + def startTransfer(self, session_hash=None): + self.factory.startTransfer(session_hash) + + +def getSessionHash(from_jid, to_jid, sid): + """Calculate SHA1 Hash according to XEP-0065 §5.3.2 + + @param from_jid(jid.JID): jid of the requester + @param to_jid(jid.JID): jid of the target + @param sid(unicode): session id + @return (str): hash + """ return hashlib.sha1((sid + from_jid.full() + to_jid.full()).encode('utf-8')).hexdigest() class SOCKSv5(protocol.Protocol, FileSender): - def __init__(self): + + def __init__(self, session_hash=None): + """ + @param session_hash(str): hash of the session + must only be used in client mode + """ log.debug(_("Protocol init")) - self.state = STATE_INITIAL + self.connection = defer.Deferred() # called when connection/auth is done + if session_hash is not None: + self.server_mode = False + self._session_hash = session_hash + self.state = STATE_CLIENT_INITIAL + else: + self.server_mode = True + self.state = STATE_INITIAL self.buf = "" self.supportedAuthMechs = [AUTHMECH_ANON] self.supportedAddrs = [ADDR_DOMAINNAME] @@ -149,10 +295,20 @@ self.peersock = None self.addressType = 0 self.requestType = 0 + self._file_obj = None + + @property + def file_obj(self): + if self._file_obj is None: + if self.server_mode: + self._file_obj = self.factory.getSession(self._session_hash)["file"] + else: + self._file_obj = self.factory.getSession()['file'] + return self._file_obj def _startNegotiation(self): - log.debug("_startNegotiation") - self.state = STATE_TARGET_AUTH + log.debug("starting negotiation (client mode)") + self.state = STATE_CLIENT_AUTH self.transport.write(struct.pack('!3B', SOCKS5_VER, 1, AUTHMECH_ANON)) def _parseNegotiation(self): @@ -184,6 +340,7 @@ return # No supported mechs found, notify client and close the connection + log.warning(u"Unsupported authentication mechanism") self.transport.write(struct.pack('!BB', SOCKS5_VER, AUTHMECH_INVALID)) self.transport.loseConnection() except struct.error: @@ -258,14 +415,16 @@ self.sendErrorReply(REPLY_CMD_NOT_SUPPORTED) except struct.error: + # The buffer is probably not complete, we need to wait more return None def _makeRequest(self): log.debug("_makeRequest") - self.state = STATE_TARGET_REQUEST - sha1 = calculateHash(self.data["from"], self.data["to"], self.sid) - request = struct.pack('!5B%dsH' % len(sha1), SOCKS5_VER, CMD_CONNECT, 0, ADDR_DOMAINNAME, len(sha1), sha1, 0) + # sha1 = getSessionHash(self.data["from"], self.data["to"], self.sid) + hash_ = self._session_hash + request = struct.pack('!5B%dsH' % len(hash_), SOCKS5_VER, CMD_CONNECT, 0, ADDR_DOMAINNAME, len(hash_), hash_, 0) self.transport.write(request) + self.state = STATE_CLIENT_REQUEST def _parseRequestReply(self): log.debug("_parseRequestReply") @@ -294,49 +453,47 @@ self.loseConnection() return - if self.factory.proxy: - self.state = STATE_READY - self.factory.activateCb(self.sid, self.factory.iq_id, self.startTransfer, self.profile) - else: - self.state = STATE_TARGET_READY - self.factory.activateCb(self.sid, self.factory.iq_id, self.profile) + # if self.factory.proxy: + # self.state = STATE_READY + # self.factory.activateCb(self.sid, self.factory.iq_id, self.startTransfer, self.profile) + # else: + self.state = STATE_READY + self.connection.callback(None) + # self.factory.activateCb(self.sid, self.factory.iq_id, self.profile) except struct.error: + # The buffer is probably not complete, we need to wait more return None def connectionMade(self): - log.debug(u"connectionMade (mode = %s)" % "requester" if isinstance(self.factory, Socks5ServerFactory) else "target") - - if isinstance(self.factory, Socks5ClientFactory): - self.sid = self.factory.sid - self.profile = self.factory.profile - self.data = self.factory.data - self.state = STATE_TARGET_INITIAL + log.debug(u"Socks5 connectionMade (mode = {})".format("server" if self.state == STATE_INITIAL else "client")) + if self.state == STATE_CLIENT_INITIAL: self._startNegotiation() def connectRequested(self, addr, port): log.debug("connectRequested") # Check that this session is expected - if addr not in self.factory.hash_sid_map: - #no: we refuse it + if not self.factory.addToSession(addr, self): self.sendErrorReply(REPLY_CONN_REFUSED) + log.warning(u"Unexpected connection request received from {host}" + .format(host=self.transport.getPeer().host)) return - self.sid, self.profile = self.factory.hash_sid_map[addr] - client = self.factory.host.getClient(self.profile) - client.xep_0065_current_stream[self.sid]["start_transfer_cb"] = self.startTransfer + self._session_hash = addr + # self.sid, self.profile = self.factory.hash_profiles_map[addr] + # client = self.factory.host.getClient(self.profile) + # client.xep_0065_current_stream[self.sid]["start_transfer_cb"] = self.startTransfer self.connectCompleted(addr, 0) - self.transport.stopReading() - def startTransfer(self, file_obj): + def startTransfer(self): """Callback called when the result iq is received""" - d = self.beginFileTransfer(file_obj, self.transport) + log.debug(u"Starting file transfer") + d = self.beginFileTransfer(self.file_obj, self.transport) d.addCallback(self.fileTransfered) def fileTransfered(self, d): log.info(_("File transfer completed, closing connection")) self.transport.loseConnection() - self.factory.finishedCb(self.sid, True, self.profile) def connectCompleted(self, remotehost, remoteport): log.debug("connectCompleted") @@ -357,8 +514,9 @@ return True def dataReceived(self, buf): - if self.state == STATE_TARGET_READY: - self.data["file_obj"].write(buf) + if self.state == STATE_READY: + # Everything is set, we just have to write the incoming data + self.file_obj.write(buf) return self.buf = self.buf + buf @@ -368,216 +526,483 @@ self._parseUserPass() if self.state == STATE_REQUEST: self._parseRequest() - if self.state == STATE_TARGET_AUTH: + if self.state == STATE_CLIENT_REQUEST: + self._parseRequestReply() + if self.state == STATE_CLIENT_AUTH: ver, method = struct.unpack('!BB', buf) self.buf = self.buf[2:] if ver != SOCKS5_VER or method != AUTHMECH_ANON: self.transport.loseConnection() else: self._makeRequest() - if self.state == STATE_TARGET_REQUEST: - self._parseRequestReply() - - def clientConnectionLost(self, reason): - log.debug("clientConnectionLost") - self.transport.loseConnection() def connectionLost(self, reason): - log.debug("connectionLost") - if self.state != STATE_CONNECT_PENDING: - self.transport.unregisterProducer() - if self.peersock is not None: - self.peersock.peersock = None - self.peersock.transport.unregisterProducer() - self.peersock = None + log.debug(u"Socks5 connection lost: {}".format(reason.value)) + # self.transport.unregisterProducer() + # if self.peersock is not None: + # self.peersock.peersock = None + # self.peersock.transport.unregisterProducer() + # self.peersock = None + if self.state != STATE_READY: + self.connection.errback(reason) + if self.server_mode : + self.factory.removeFromSession(self._session_hash, self, reason) class Socks5ServerFactory(protocol.ServerFactory): protocol = SOCKSv5 - def __init__(self, host, hash_sid_map, finishedCb): - self.host = host - self.hash_sid_map = hash_sid_map - self.finishedCb = finishedCb + def __init__(self, parent): + """ + @param parent(XEP_0065): XEP_0065 parent instance + """ + self.parent = parent + + def getSession(self, session_hash): + return self.parent.getSession(session_hash, None) + + def startTransfer(self, session_hash): + session = self.getSession(session_hash) + try: + protocol = session['protocols'][0] + except (KeyError, IndexError): + log.error(u"Can't start file transfer, can't find protocol") + else: + protocol.startTransfer() + + def addToSession(self, session_hash, protocol): + """Check is session_hash is valid, and associate protocol with it - def startedConnecting(self, connector): - log.debug(_("Socks 5 server connection started")) + the session will be associated to the corresponding candidate + @param session_hash(str): hash of the session + @param protocol(SOCKSv5): protocol instance + @param return(bool): True if hash was valid (i.e. expected), False else + """ + try: + session_data = self.getSession(session_hash) + except KeyError: + return False + else: + session_data.setdefault('protocols', []).append(protocol) + return True + + def removeFromSession(self, session_hash, protocol, reason): + """Remove a protocol from session_data - def clientConnectionLost(self, connector, reason): - log.debug(_(u"Socks 5 server connection lost (reason: %s)") % reason) + There can be several protocol instances while candidates are tried, they + have removed when candidate connection is closed + @param session_hash(str): hash of the session + @param protocol(SOCKSv5): protocol instance + @param reason(failure.Failure): reason of the removal + """ + try: + protocols = self.getSession(session_hash)['protocols'] + protocols.remove(protocol) + except (KeyError, ValueError): + log.error(u"Protocol not found in session while it should be there") + else: + if not protocols: + # The last protocol has been removed, session is finished + if reason.check(internet_error.ConnectionDone): + self.getSession(session_hash)[DEFER_KEY].callback(None) + else: + self.getSession(session_hash)[DEFER_KEY].errback(reason) class Socks5ClientFactory(protocol.ClientFactory): protocol = SOCKSv5 - def __init__(self, current_stream, sid, iq_id, activateCb, finishedCb, proxy=False, profile=None): + # def __init__(self, stream_data, sid, iq_id, activateCb, finishedCb, proxy=False, profile=C.PROF_KEY_NONE): + def __init__(self, parent, session_hash, profile): """Init the Client Factory - @param current_stream: current streams data - @param sid: Session ID - @param iq_id: iq id used to initiate the stream - @param activateCb: method to call to activate the stream - @param finishedCb: method to call when the stream session is finished - @param proxy: True if we are connecting throught a proxy (and we are a requester) - @param profile: %(doc_profile)s""" - assert(profile) - self.data = current_stream[sid] - self.sid = sid - self.iq_id = iq_id - self.activateCb = activateCb - self.finishedCb = finishedCb - self.proxy = proxy + + @param session_hash(unicode): hash of the session + hash is the same as hostname computer in XEP-0065 § 5.3.2 #1 + @param profile(unciode): %(doc_profile)s + """ + self.session = parent.getSession(session_hash, profile) + self.session_hash = session_hash self.profile = profile + self.connection = defer.Deferred() + self._protocol_instance = None + self.connector = None + self._discarded = False + # self.data = stream_data[sid] + # self.sid = sid + # self.iq_id = iq_id + # self.activateCb = activateCb + # self.finishedCb = finishedCb + # self.proxy = proxy + # self.profile = profile - def startedConnecting(self, connector): - log.debug(_("Socks 5 client connection started")) + def discard(self): + """Disconnect the client + + Also set a discarded flag, which avoid to call the session Deferred + """ + self.connector.disconnect() + self._discarded = True + + def getSession(self): + return self.session + + def startTransfer(self, dummy=None): + self._protocol_instance.startTransfer() + + def clientConnectionFailed(self, connector, reason): + log.debug(u"Connection failed") + self.connection.errback(reason) def clientConnectionLost(self, connector, reason): - log.debug(_(u"Socks 5 client connection lost (reason: %s)") % reason) - self.finishedCb(self.sid, reason.type == error.ConnectionDone, self.profile) # TODO: really check if the state is actually successful + log.debug(_(u"Socks 5 client connection lost (reason: %s)") % reason.value) + self._protocol_instance = None + if not self._discarded: + # This one was used for the transfer, than mean that + # the Socks5 session is finished + if reason.check(internet_error.ConnectionDone): + self.getSession()[DEFER_KEY].callback(None) + else: + self.getSession()[DEFER_KEY].errback(reason) + # self.finishedCb(self.sid, reason.type == internet_error.ConnectionDone, self.profile) # TODO: really check if the state is actually successful + + def buildProtocol(self, addr): + log.debug(("Socks 5 client connection started")) + p = self.protocol(session_hash=self.session_hash) + p.factory = self + p.connection.chainDeferred(self.connection) + self._protocol_instance = p + return p class XEP_0065(object): - NAMESPACE = NS_BS - - params = """ - <params> - <general> - <category name="File Transfer"> - <param name="IP" value='0.0.0.0' default_cb='yes' type="string" /> - <param name="Port" value="28915" type="int" constraint="1;65535" /> - </category> - </general> - <individual> - <category name="File Transfer"> - <param name="Proxy" value="" type="string" /> - <param name="Proxy host" value="" type="string" /> - <param name="Proxy port" value="" type="int" constraint="1;65535" /> - </category> - </individual> - </params> - """ + TYPE_DIRECT = 'direct' + TYPE_ASSISTED = 'assisted' + TYPE_TUNEL = 'tunel' + TYPE_PROXY = 'proxy' + Candidate = Candidate def __init__(self, host): log.info(_("Plugin XEP_0065 initialization")) - - #session data - self.hash_sid_map = {} # key: hash of the transfer session, value: (session id, profile) + self.host = host - self.host = host - log.debug(_("registering")) - self.server_factory = Socks5ServerFactory(host, self.hash_sid_map, lambda sid, success, profile: self._killId(sid, success, profile=profile)) + # session data + self.hash_profiles_map = {} # key: hash of the transfer session, value: session data + self._cache_proxies = {} # key: server jid, value: proxy data + + # misc data + self._server_factory = None + self._external_port = None - #parameters - host.memory.updateParams(XEP_0065.params) - host.memory.setDefault("IP", "File Transfer", self.getExternalIP) - port = int(self.host.memory.getParamA("Port", "File Transfer")) + # plugins shortcuts + self._ip = self.host.plugins['IP'] + try: + self._np = self.host.plugins['NAT-PORT'] + except KeyError: + log.debug(u"NAT Port plugin not available") + self._np = None - log.info(_("Launching Socks5 Stream server on port %d") % port) - reactor.listenTCP(port, self.server_factory) + # parameters + # XXX: params are not used for now, but they may be used in the futur to force proxy/IP + # host.memory.updateParams(PARAMS) def getHandler(self, profile): return XEP_0065_handler(self) def profileConnected(self, profile): client = self.host.getClient(profile) - client.xep_0065_current_stream = {} # key: stream_id, value: data(dict) + client.xep_0065_current_stream = {} # key: stream_id, value: session_data(dict) + client._s5b_sessions = {} + + def getSessionHash(self, from_jid, to_jid, sid): + return getSessionHash(from_jid, to_jid, sid) + + def getSocks5ServerFactory(self): + """Return server factory + + The server is created if it doesn't exists yet + self._server_factory_port is set on server creation + """ - def getExternalIP(self): - """Return IP visible from outside, by asking to a website""" - return getPage("http://www.goffi.org/sat_tools/get_ip.php") + if self._server_factory is None: + # self._server_factory = Socks5ServerFactory(self.host, self.hash_profiles_map, lambda sid, client: self._killSession(sid, client)) + self._server_factory = Socks5ServerFactory(self) + for port in xrange(SERVER_STARTING_PORT, 65356): + try: + listening_port = reactor.listenTCP(port, self._server_factory) + except internet_error.CannotListenError as e: + log.debug(u"Cannot listen on port {port}: {err_msg}{err_num}".format( + port=port, + err_msg=e.socketError.strerror, + err_num=u' (error code: {})'.format(e.socketError.errno), + )) + else: + self._server_factory_port = listening_port.getHost().port + break - def getProgress(self, sid, data, profile): - """Fill data with position of current transfer""" + log.info(_("Socks5 Stream server launched on port {}").format(self._server_factory_port)) + return self._server_factory + + @defer.inlineCallbacks + def getProxy(self, profile): + """Return the proxy available for this profile + + cache is used between profiles using the same server + @param profile: %(doc_profile)s + @return ((D)(ProxyInfos, None)): Found proxy infos, + or None if not acceptable proxy is found + """ + def notFound(server): + log.info(u"No proxy found on this server") + self._cache_proxies[server] = None + defer.returnValue(None) client = self.host.getClient(profile) + server = client.jid.host try: - file_obj = client.xep_0065_current_stream[sid]["file_obj"] - data["position"] = str(file_obj.tell()) - data["size"] = str(client.xep_0065_current_stream[sid]["size"]) - except: + defer.returnValue(self._cache_proxies[server]) + except KeyError: pass + try: + proxy = (yield self.host.findServiceEntities('proxy', 'bytestreams', profile=profile)).pop() + except (exceptions.CancelError, StopIteration): + notFound(server) + iq_elt = client.IQ('get') + iq_elt['to'] = proxy.full() + iq_elt.addElement('query', NS_BS) - def _timeOut(self, sid, profile): + try: + result_elt = yield iq_elt.send() + except jabber_error.StanzaError as failure: + log.warning(u"Error while requesting proxy info on {jid}: {error}" + .format(proxy.full(), failure)) + notFound(server) + + try: + query_elt = result_elt.elements(NS_BS, 'query').next() + streamhost_elt = query_elt.elements(NS_BS, 'streamhost').next() + host = streamhost_elt['host'] + jid_ = streamhost_elt['jid'] + port = streamhost_elt['port'] + if not all((host, jid, port)): + raise KeyError + jid_ = jid.JID(jid_) + except (StopIteration, KeyError, RuntimeError, jid.InvalidFormat): + log.warning(u"Invalid proxy data received from {}".format(proxy.full())) + notFound(server) + + proxy_infos = self._cache_proxies[server] = ProxyInfos(host, jid_, port) + log.info(u"Proxy found: {}".format(proxy_infos)) + defer.returnValue(proxy_infos) + + @defer.inlineCallbacks + def _getNetworkData(self, client): + """Retrieve information about network + + @param client: %(doc_client)s + @return (D(tuple[local_port, external_port, local_ips, external_ip])): network data + """ + self.getSocks5ServerFactory() + local_port = self._server_factory_port + external_ip = yield self._ip.getExternalIP(client.profile) + local_ips = yield self._ip.getLocalIPs(client.profile) + + if not local_ips: + log.warning(u"Can't find local IPs, we can't do direct connection") + else: + if external_ip is not None and self._external_port is None: + if external_ip != local_ips[0]: + log.info(u"We are probably behind a NAT") + if self._np is None: + log.warning(u"NAT port plugin not available, we can't map port") + else: + ext_port = yield self._np.mapPort(local_port, desc=u"SaT socks5 stream") + if ext_port is None: + log.warning(u"Can't map NAT port") + else: + self._external_port = ext_port + + defer.returnValue((local_port, self._external_port, local_ips, external_ip)) + + @defer.inlineCallbacks + def getCandidates(self, profile): + """Return a list of our stream candidates + + @param profile: %(doc_profile)s + @return (D(list[Candidate])): list of candidates, ordered by priority + """ + client = self.host.getClient(profile) + server_factory = yield self.getSocks5ServerFactory() + local_port, ext_port, local_ips, external_ip = yield self._getNetworkData(client) + proxy = yield self.getProxy(profile) + + # its time to gather the candidates + candidates = [] + + # first the direct ones + if local_ips: + # the preferred direct connection + ip = local_ips.pop(0) + candidates.append(Candidate(ip, local_port, XEP_0065.TYPE_DIRECT, PRIORITY_BEST_DIRECT, client.jid, priority_local=True, factory=server_factory)) + for ip in local_ips: + candidates.append(Candidate(ip, local_port, XEP_0065.TYPE_DIRECT, PRIORITY_DIRECT, client.jid, priority_local=True, factory=server_factory)) + + # then the assisted one + if ext_port is not None: + candidates.append(Candidate(external_ip, ext_port, XEP_0065.TYPE_ASSISTED, PRIORITY_ASSISTED, client.jid, priority_local=True, factory=server_factory)) + + # finally the proxy + if proxy: + candidates.append(Candidate(proxy.host, proxy.port, XEP_0065.TYPE_PROXY, PRIORITY_PROXY, proxy.jid, priority_local=True)) + + # should be already sorted, but just in case the priorities get weird + candidates.sort(key=lambda c: c.priority, reverse=True) + + defer.returnValue(candidates) + + def _addConnector(self, connector, candidate): + """Add connector used to connect to candidate, and return client factory's connection Deferred + + the connector can be used to disconnect the candidate, and returning the factory's connection Deferred allow to wait for connection completion + @param connector: a connector implementing IConnector + @param candidate(Candidate): candidate linked to the connector + @return (D): Deferred fired when factory connection is done or has failed + """ + candidate.factory.connector = connector + return candidate.factory.connection + + def tryCandidates(self, candidates, session_hash, connection_cb=None, connection_eb=None, profile=C.PROF_KEY_NONE): + defers_list = [] + + for candidate in candidates: + factory = Socks5ClientFactory(self, session_hash, profile) + candidate.factory = factory + delay = CANDIDATE_DELAY * len(defers_list) + if candidate.type == XEP_0065.TYPE_PROXY: + delay += CANDIDATE_DELAY_PROXY + d = sat_defer.DelayedDeferred(delay, candidate.host) + d.addCallback(reactor.connectTCP, candidate.port, factory) + d.addCallback(self._addConnector, candidate) + if connection_cb is not None: + d.addCallback(lambda dummy, candidate=candidate, profile=profile: connection_cb(candidate, profile)) + if connection_eb is not None: + d.addErrback(connection_eb, candidate, profile) + defers_list.append(d) + + return defers_list + + def getBestCandidate(self, candidates, session_hash, profile=C.PROF_KEY_NONE): + defer_candidates = None + + def connectionCb(candidate, profile): + log.info(u"Connection of {} successful".format(unicode(candidate))) + for idx, other_candidate in enumerate(candidates): + try: + if other_candidate.priority < candidate.priority: + log.debug(u"Cancelling {}".format(other_candidate)) + defer_candidates[idx].cancel() + except AttributeError: + assert other_candidate is None + + def connectionEb(failure, candidate, profile): + if failure.check(defer.CancelledError): + log.debug(u"Connection of {} has been cancelled".format(candidate)) + else: + log.info(u"Connection of {candidate} Failed: {error}".format( + candidate = candidate, + error = failure.value)) + candidates[candidates.index(candidate)] = None + + def allTested(self): + log.debug(u"All candidates have been tested") + good_candidates = [c for c in candidates if c] + return good_candidates[0] if good_candidates else None + + defer_candidates = self.tryCandidates(candidates, session_hash, connectionCb, connectionEb, profile) + d_list = defer.DeferredList(defer_candidates) + d_list.addCallback(allTested) + return d_list + + def _timeOut(self, sid, client): """Delecte current_stream id, called after timeout @param id: id of client.xep_0065_current_stream""" - log.info(_("Socks5 Bytestream: TimeOut reached for id %(sid)s [%(profile)s]") - % {"sid": sid, "profile": profile}) - self._killId(sid, False, "TIMEOUT", profile) + log.info(_("Socks5 Bytestream: TimeOut reached for id {sid} [{profile}]").format( + sid=sid, profile=client.profile)) + self._killSession(sid, client, u"TIMEOUT") + + def _killSession(self, sid, client, failure_reason=None): + """Delete a current_stream id, clean up associated observers - def _killId(self, sid, success=False, failure_reason="UNKNOWN", profile=None): - """Delete an current_stream id, clean up associated observers - @param sid: id of client.xep_0065_current_stream""" - assert(profile) - client = self.host.getClient(profile) - if sid not in client.xep_0065_current_stream: + @param sid(unicode): session id + @param client: %(doc_client)s + @param failure_reason(None, unicode): if None the session is successful + else, will be used to call failure_cb + """ + try: + session = client.xep_0065_current_stream[sid] + except KeyError: log.warning(_("kill id called on a non existant id")) return - if "observer_cb" in client.xep_0065_current_stream[sid]: - xmlstream = client.xep_0065_current_stream[sid]["xmlstream"] - xmlstream.removeObserver(client.xep_0065_current_stream[sid]["event_data"], client.xep_0065_current_stream[sid]["observer_cb"]) - if client.xep_0065_current_stream[sid]['timer'].active(): - client.xep_0065_current_stream[sid]['timer'].cancel() - if "size" in client.xep_0065_current_stream[sid]: - self.host.removeProgressCB(sid, profile) + + try: + observer_cb = session['observer_cb'] + except KeyError: + pass + else: + client.xmlstream.removeObserver(session["event_data"], observer_cb) + + if session['timer'].active(): + session['timer'].cancel() - file_obj = client.xep_0065_current_stream[sid]['file_obj'] - success_cb = client.xep_0065_current_stream[sid]['success_cb'] - failure_cb = client.xep_0065_current_stream[sid]['failure_cb'] + del client.xep_0065_current_stream[sid] - session_hash = client.xep_0065_current_stream[sid].get('hash') - del client.xep_0065_current_stream[sid] - if session_hash in self.hash_sid_map: - #FIXME: check that self.hash_sid_map is correctly cleaned in all cases (timeout, normal flow, etc). - del self.hash_sid_map[session_hash] + # FIXME: to check + try: + session_hash = session.get['hash'] + del self.hash_profiles_map[session_hash] + # FIXME: check that self.hash_profiles_map is correctly cleaned in all cases (timeout, normal flow, etc). + except KeyError: + log.debug(u"Not hash found for this session") + pass + + success = failure_reason is None + stream_d = session[DEFER_KEY] if success: - success_cb(sid, file_obj, NS_BS, profile) + stream_d.callback(None) else: - failure_cb(sid, file_obj, NS_BS, failure_reason, profile) + stream_d.errback(failure.Failure(exceptions.DataError(failure_reason))) - def startStream(self, file_obj, to_jid, sid, length, successCb, failureCb, size=None, profile=None): + def startStream(self, file_obj, to_jid, sid, profile=C.PROF_KEY_NONE): """Launch the stream workflow + @param file_obj: file_obj to send @param to_jid: JID of the recipient @param sid: Stream session id - @param length: number of byte to send, or None to send until the end @param successCb: method to call when stream successfuly finished @param failureCb: method to call when something goes wrong - @param profile: %(doc_profile)s""" - assert(profile) + @param profile: %(doc_profile)s + """ client = self.host.getClient(profile) - - if length is not None: - log.error(_('stream length not managed yet')) - return - - profile_jid = client.jid - xmlstream = client.xmlstream + session_data = self._createSession(file_obj, to_jid, sid, client.profile) - data = client.xep_0065_current_stream[sid] = {} - data["timer"] = reactor.callLater(TIMEOUT, self._timeOut, sid, profile) - data["file_obj"] = file_obj - data["from"] = profile_jid - data["to"] = to_jid - data["success_cb"] = successCb - data["failure_cb"] = failureCb - data["xmlstream"] = xmlstream - data["hash"] = calculateHash(profile_jid, to_jid, sid) - self.hash_sid_map[data["hash"]] = (sid, profile) - if size: - data["size"] = size - self.host.registerProgressCB(sid, self.getProgress, profile) - iq_elt = jabber_client.IQ(xmlstream, 'set') - iq_elt["from"] = profile_jid.full() + session_data["to"] = to_jid + session_data["xmlstream"] = client.xmlstream + hash_ = session_data["hash"] = getSessionHash(client.jid, to_jid, sid) + + self.hash_profiles_map[hash_] = (sid, profile) + + iq_elt = jabber_client.IQ(client.xmlstream, 'set') + iq_elt["from"] = client.jid.full() iq_elt["to"] = to_jid.full() query_elt = iq_elt.addElement('query', NS_BS) query_elt['mode'] = 'tcp' query_elt['sid'] = sid + #first streamhost: direct connection streamhost = query_elt.addElement('streamhost') streamhost['host'] = self.host.memory.getParamA("IP", "File Transfer") streamhost['port'] = self.host.memory.getParamA("Port", "File Transfer") - streamhost['jid'] = profile_jid.full() + streamhost['jid'] = client.jid.full() #second streamhost: mediated connection, using proxy streamhost = query_elt.addElement('streamhost') @@ -585,60 +1010,68 @@ streamhost['port'] = self.host.memory.getParamA("Proxy port", "File Transfer", profile_key=profile) streamhost['jid'] = self.host.memory.getParamA("Proxy", "File Transfer", profile_key=profile) - iq_elt.addCallback(self.iqResult, sid, profile) + iq_elt.addCallback(self._IQOpen, session_data, client) iq_elt.send() + return session_data[DEFER_KEY] - def iqResult(self, sid, profile, iq_elt): - """Called when the result of open iq is received""" + def _IQOpen(self, session_data, client, iq_elt): + """Called when the result of open iq is received + + @param session_data(dict): data of the session + @param client: %(doc_client)s + @param iq_elt(domish.Element): <iq> result + """ + sid = session_data['id'] if iq_elt["type"] == "error": - log.warning(_("Transfer failed")) - return - client = self.host.getClient(profile) - try: - data = client.xep_0065_current_stream[sid] - file_obj = data["file_obj"] - timer = data["timer"] - except KeyError: - log.error(_("Internal error, can't do transfer")) + log.warning(_("Socks5 transfer failed")) + # FIXME: must clean session return - if timer.active(): - timer.cancel() + try: + session_data = client.xep_0065_current_stream[sid] + file_obj = session_data["file_obj"] + timer = session_data["timer"] + except KeyError: + raise exceptions.InternalError - profile_jid, xmlstream = self.host.getJidNStream(profile) - query_elt = iq_elt.firstChildElement() - streamhost_elts = filter(lambda elt: elt.name == 'streamhost-used', query_elt.elements()) + timer.reset(TIMEOUT) + + query_elt = iq_elt.elements(NS_BS, 'query').next() + streamhost_elts = list(query_elt.elements(NS_BS, 'streamhost-used')) + if not streamhost_elts: log.warning(_("No streamhost found in stream query")) + # FIXME: must clean session return + # FIXME: must be cleaned ! + streamhost_jid = streamhost_elts[0]['jid'] - if streamhost_jid != profile_jid.full(): + if streamhost_jid != client.jid.full(): log.debug(_("A proxy server is used")) - proxy_host = self.host.memory.getParamA("Proxy host", "File Transfer", profile_key=profile) - proxy_port = self.host.memory.getParamA("Proxy port", "File Transfer", profile_key=profile) - proxy_jid = self.host.memory.getParamA("Proxy", "File Transfer", profile_key=profile) + proxy_host = self.host.memory.getParamA("Proxy host", "File Transfer", profile_key=client.profile) + proxy_port = self.host.memory.getParamA("Proxy port", "File Transfer", profile_key=client.profile) + proxy_jid = self.host.memory.getParamA("Proxy", "File Transfer", profile_key=client.profile) if proxy_jid != streamhost_jid: log.warning(_("Proxy jid is not the same as in parameters, this should not happen")) return - factory = Socks5ClientFactory(client.xep_0065_current_stream, sid, None, self.activateProxyStream, lambda sid, success, profile: self._killId(sid, success, profile=profile), True, profile) + factory = Socks5ClientFactory(client.xep_0065_current_stream, sid, None, self.activateProxyStream, lambda sid, success, profile: self._killSession(sid, client), True, client.profile) reactor.connectTCP(proxy_host, int(proxy_port), factory) else: - data["start_transfer_cb"](file_obj) # We now activate the stream + session_data["start_transfer_cb"](file_obj) # We now activate the stream def activateProxyStream(self, sid, iq_id, start_transfer_cb, profile): log.debug(_("activating stream")) client = self.host.getClient(profile) - data = client.xep_0065_current_stream[sid] - profile_jid, xmlstream = self.host.getJidNStream(profile) + session_data = client.xep_0065_current_stream[sid] - iq_elt = client.IQ(xmlstream, 'set') - iq_elt["from"] = profile_jid.full() + iq_elt = client.IQ(client.xmlstream, 'set') + iq_elt["from"] = client.jid.full() iq_elt["to"] = self.host.memory.getParamA("Proxy", "File Transfer", profile_key=profile) query_elt = iq_elt.addElement('query', NS_BS) query_elt['sid'] = sid - query_elt.addElement('activate', content=data['to'].full()) - iq_elt.addCallback(self.proxyResult, sid, start_transfer_cb, data['file_obj']) + query_elt.addElement('activate', content=session_data['to'].full()) + iq_elt.addCallback(self.proxyResult, sid, start_transfer_cb, session_data['file_obj']) iq_elt.send() def proxyResult(self, sid, start_transfer_cb, file_obj, iq_elt): @@ -648,26 +1081,95 @@ else: start_transfer_cb(file_obj) - def prepareToReceive(self, from_jid, sid, file_obj, size, success_cb, failure_cb, profile): + def createSession(self, *args, **kwargs): + """like [_createSession] but return the session deferred instead of the whole session + + session deferred is fired when transfer is finished + """ + return self._createSession(*args, **kwargs)[DEFER_KEY] + + def _createSession(self, file_obj, to_jid, sid, profile): """Called when a bytestream is imminent - @param from_jid: jid of the sender - @param sid: Stream id - @param file_obj: File object where data will be written - @param size: full size of the data, or None if unknown - @param success_cb: method to call when successfuly finished - @param failure_cb: method to call when something goes wrong - @param profile: %(doc_profile)s""" + + @param file_obj(file): File object where data will be written + @param to_jid(jid.JId): jid of the other peer + @param sid(unicode): session id + @param profile: %(doc_profile)s + @return (dict): session data + """ client = self.host.getClient(profile) - data = client.xep_0065_current_stream[sid] = {} - data["from"] = from_jid - data["file_obj"] = file_obj - data["seq"] = -1 - if size: - data["size"] = size - self.host.registerProgressCB(sid, self.getProgress, profile) - data["timer"] = reactor.callLater(TIMEOUT, self._timeOut, sid, profile) - data["success_cb"] = success_cb - data["failure_cb"] = failure_cb + if sid in client.xep_0065_current_stream: + raise exceptions.ConflictError(u'A session with this id already exists !') + session_data = client.xep_0065_current_stream[sid] = \ + {'id': sid, + DEFER_KEY: defer.Deferred(), + 'to': to_jid, + 'file_obj': file_obj, + 'seq': -1, # FIXME: to check + 'timer': reactor.callLater(TIMEOUT, self._timeOut, sid, client), + } + + return session_data + + def getSession(self, session_hash, profile): + """Return session data + + @param session_hash(unicode): hash of the session + hash is the same as hostname computer in XEP-0065 § 5.3.2 #1 + @param profile(None, unicode): profile of the peer + None is used only if profile is unknown (this is only the case + for incoming request received by Socks5ServerFactory). None must + only be used by Socks5ServerFactory. + See comments below for details + @return (dict): session data + """ + if profile is None: + try: + profile = self.hash_profiles_map[session_hash] + except KeyError as e: + log.warning(u"The requested session doesn't exists !") + raise e + client = self.host.getClient(profile) + return client._s5b_sessions[session_hash] + + def registerHash(self, *args, **kwargs): + """like [_registerHash] but resutrn the session deferred instead of the whole session + session deferred is fired when transfer is finished + """ + return self._registerHash(*args, **kwargs)[DEFER_KEY] + + def _registerHash(self, session_hash, file_obj, profile): + """Create a session_data associated to hash + + @param session_hash(str): hash of the session + @param file_obj(file): file-like object + @param profile: %(doc_profile)s + return (dict): session data + """ + client = self.host.getClient(profile) + assert session_hash not in client._s5b_sessions + session_data = client._s5b_sessions[session_hash] = { + "file": file_obj, + DEFER_KEY: defer.Deferred(), + } + if session_hash in self.hash_profiles_map: + # The only case when 2 profiles want to register the same hash + # is when they are on the same instance + log.info(u"Both Socks5 peers are on the same instance") + # XXX:If both peers are on the same instance, they'll register the same + # session_hash, so we'll have 2 profiles for the same hash. The first + # one will be the responder (and so the second one the initiator). + # As we'll keep the initiator choosed candidate (see XEP-0260 § 2.4 #4), + # responder will handle the Socks5 server. Only the server will use + # self.hash_profiles_map to get the profile, so we can ignore the second + # one (the initiator profile). + # There is no easy way to known if the incoming connection + # to the Socks5Server is from initiator or responder, so this seams a + # reasonable workaround. + else: + self.hash_profiles_map[session_hash] = profile + + return session_data def streamQuery(self, iq_elt, profile): """Get file using byte stream""" @@ -710,23 +1212,23 @@ client.xep_0065_current_stream[sid]["streamhost"] = (sh_host, sh_port, sh_jid) log.info(_("Stream proposed: host=[%(host)s] port=[%(port)s]") % {'host': sh_host, 'port': sh_port}) - factory = Socks5ClientFactory(client.xep_0065_current_stream, sid, iq_elt["id"], self.activateStream, lambda sid, success, profile: self._killId(sid, success, profile=profile), profile=profile) + factory = Socks5ClientFactory(client.xep_0065_current_stream, sid, iq_elt["id"], self.activateStream, lambda sid, success, profile: self._killSession(sid, client), profile=profile) reactor.connectTCP(sh_host, int(sh_port), factory) def activateStream(self, sid, iq_id, profile): client = self.host.getClient(profile) log.debug(_("activating stream")) result = domish.Element((None, 'iq')) - data = client.xep_0065_current_stream[sid] + session_data = client.xep_0065_current_stream[sid] result['type'] = 'result' result['id'] = iq_id - result['from'] = data["to"].full() - result['to'] = data["from"].full() + result['from'] = session_data["to"].full() + result['to'] = session_data["from"].full() query = result.addElement('query', NS_BS) query['sid'] = sid streamhost = query.addElement('streamhost-used') - streamhost['jid'] = data["streamhost"][2] - data["xmlstream"].send(result) + streamhost['jid'] = session_data["streamhost"][2] + session_data["xmlstream"].send(result) def sendNotAcceptableError(self, iq_id, to_jid, xmlstream): """Not acceptable error used when the stream is not expected or something is going wrong @@ -764,50 +1266,8 @@ self.plugin_parent = plugin_parent self.host = plugin_parent.host - def _proxyDataResult(self, iq_elt): - """Called with the information about proxy according to XEP-0065 #4 - Params should be filled with these infos""" - if iq_elt["type"] == "error": - log.warning(_("Can't determine proxy information")) - return - query_elt = iq_elt.firstChildElement() - if query_elt.name != "query": - log.warning(_("Bad answer received from proxy")) - return - streamhost_elts = filter(lambda elt: elt.name == 'streamhost', query_elt.elements()) - if not streamhost_elts: - log.warning(_("No streamhost found in stream query")) - return - if len(streamhost_elts) != 1: - log.warning(_("Multiple streamhost elements in proxy not managed, keeping only the first one")) - streamhost_elt = streamhost_elts[0] - self.host.memory.setParam("Proxy", streamhost_elt.getAttribute("jid", ""), - "File Transfer", profile_key=self.parent.profile) - self.host.memory.setParam("Proxy host", streamhost_elt.getAttribute("host", ""), - "File Transfer", profile_key=self.parent.profile) - self.host.memory.setParam("Proxy port", streamhost_elt.getAttribute("port", ""), - "File Transfer", profile_key=self.parent.profile) - def connectionInitialized(self): - def connection_ok(dummy): - self.xmlstream.addObserver(BS_REQUEST, self.plugin_parent.streamQuery, profile=self.parent.profile) - proxy = self.host.memory.getParamA("Proxy", "File Transfer", profile_key=self.parent.profile) - if not proxy: - def proxiesFound(entities): - try: - proxy_ent = entities.pop() - except KeyError: - log.info(_("No proxy found on this server")) - return - iq_elt = jabber_client.IQ(self.parent.xmlstream, 'get') - iq_elt["to"] = proxy_ent.full() - iq_elt.addElement('query', NS_BS) - iq_elt.addCallback(self._proxyDataResult) - iq_elt.send() - d = self.host.findServiceEntities("proxy", "bytestreams", profile_key=self.parent.profile) - d.addCallback(proxiesFound) - self.parent.getConnectionDeferred().addCallback(connection_ok) - + self.xmlstream.addObserver(BS_REQUEST, self.plugin_parent.streamQuery, profile=self.parent.profile) def getDiscoInfo(self, requestor, target, nodeIdentifier=''): return [disco.DiscoFeature(NS_BS)]