changeset 1559:7cc29634b6ef

plugin XEP-0065, XEP-0096: preparation for plugin XEP-0260 implementation: /!\ SI File Transfert (plugin XEP-0096) is temporarily broken /!\ proxy handling is temporarily broken plugin XEP-0096: use of Deferred for plugin XEP-0065 in the same way as for plugin XEP-0047 plugin XEP-0065: - use of Deferred for sessions - plugin IP is a dependency - plugin NAT-PORT is used if available - everything is now automatic, params are disabled for now (may be re-used in the future to force port or proxy) - proxy infos are managed with a namedtuple - connexion candidates are managed with a dedicate class - priorities can be used for candidates, as needed for XEP-0260 - transfer can now be managed in both direction, with client or server - socks5 server is launcher on demand, once for all profiles - helper methods to try and find best candidate - connection test and file transfer are done in 2 times
author Goffi <goffi@goffi.org>
date Mon, 02 Nov 2015 22:02:41 +0100 (2015-11-02)
parents 6a8dd91476f0
children dcce63810733
files src/plugins/plugin_xep_0065.py src/plugins/plugin_xep_0096.py
diffstat 2 files changed, 767 insertions(+), 307 deletions(-) [+]
line wrap: on
line diff
--- a/src/plugins/plugin_xep_0065.py	Mon Nov 02 22:02:41 2015 +0100
+++ b/src/plugins/plugin_xep_0065.py	Mon Nov 02 22:02:41 2015 +0100
@@ -23,7 +23,7 @@
 
 # --
 
-# This program is based on proxy65 (http://code.google.com/p/proxy65),
+# This module is based on proxy65 (http://code.google.com/p/proxy65),
 # originaly written by David Smith and modified by Fabio Forno.
 # It is sublicensed under AGPL v3 (or any later version) as allowed by the original
 # license.
@@ -57,15 +57,23 @@
 from sat.core.i18n import _
 from sat.core.log import getLogger
 log = getLogger(__name__)
-from twisted.internet import protocol, reactor
-from twisted.internet import error
+from sat.core.constants import Const as C
+from sat.core import exceptions
+from sat.tools import sat_defer
+from twisted.internet import protocol
+from twisted.internet import reactor
+from twisted.internet import error as internet_error
 from twisted.words.protocols.jabber import jid, client as jabber_client
+from twisted.words.protocols.jabber import error as jabber_error
 from twisted.protocols.basic import FileSender
 from twisted.words.xish import domish
-from twisted.web.client import getPage
+from twisted.internet import defer
+from twisted.python import failure
 from sat.core.exceptions import ProfileNotInCacheError
+from collections import namedtuple
 import struct
 import hashlib
+import uuid
 
 from zope.interface import implements
 
@@ -80,29 +88,58 @@
 NS_BS = 'http://jabber.org/protocol/bytestreams'
 BS_REQUEST = IQ_SET + '/query[@xmlns="' + NS_BS + '"]'
 TIMEOUT = 60  # timeout for workflow
+DEFER_KEY = 'finished' # key of the deferred used to track session end
+SERVER_STARTING_PORT = 0 # starting number for server port search (0 to ask automatic attribution)
+
+# priorities are candidates local priorities, must be a int between 0 and 65535
+PRIORITY_BEST_DIRECT = 10000
+PRIORITY_DIRECT = 5000
+PRIORITY_ASSISTED = 1000
+PRIORITY_PROXY = 0.2 # proxy is the last option for s5b
+CANDIDATE_DELAY = 0.2 # see XEP-0260 §4
+CANDIDATE_DELAY_PROXY = 0.2 # additional time for proxy types (see XEP-0260 §4 note 3)
 
 PLUGIN_INFO = {
     "name": "XEP 0065 Plugin",
     "import_name": "XEP-0065",
     "type": "XEP",
     "protocols": ["XEP-0065"],
+    "dependencies": ["IP"],
+    "recommendations": ["NAT-PORT"],
     "main": "XEP_0065",
     "handler": "yes",
     "description": _("""Implementation of SOCKS5 Bytestreams""")
 }
 
-STATE_INITIAL = 0
-STATE_AUTH = 1
-STATE_REQUEST = 2
-STATE_READY = 3
-STATE_AUTH_USERPASS = 4
-STATE_TARGET_INITIAL = 5
-STATE_TARGET_AUTH = 6
-STATE_TARGET_REQUEST = 7
-STATE_TARGET_READY = 8
-STATE_LAST = 9
+# XXX: by default eveything is automatic
+# TODO: use these params to force use of specific proxy/port/IP
+# PARAMS = """
+#     <params>
+#     <general>
+#     <category name="File Transfer">
+#         <param name="Force IP" type="string" />
+#         <param name="Force Port" type="int" constraint="1;65535" />
+#     </category>
+#     </general>
+#     <individual>
+#     <category name="File Transfer">
+#         <param name="Force Proxy" value="" type="string" />
+#         <param name="Force Proxy host" value="" type="string" />
+#         <param name="Force Proxy port" value="" type="int" constraint="1;65535" />
+#     </category>
+#     </individual>
+#     </params>
+#     """
 
-STATE_CONNECT_PENDING = STATE_LAST + 1
+(STATE_INITIAL,
+STATE_AUTH,
+STATE_REQUEST,
+STATE_READY,
+STATE_AUTH_USERPASS,
+STATE_CLIENT_INITIAL,
+STATE_CLIENT_AUTH,
+STATE_CLIENT_REQUEST,
+) = xrange(8)
 
 SOCKS5_VER = 0x05
 
@@ -129,19 +166,128 @@
 REPLY_ADDR_NOT_SUPPORTED = 0x08
 
 
-def calculateHash(from_jid, to_jid, sid):
-    """Calculate SHA1 Hash according to XEP-0065
-    @param from_jid: jid of the requester
-    @param to_jid: jid of the target
-    @param sid: session id
-    @return: hash (string)"""
+ProxyInfos = namedtuple("ProxyInfos", ['host', 'jid', 'port'])
+
+
+class Candidate(object):
+
+    def __init__(self, host, port, type_, priority, jid_, id_=None, priority_local=False, factory=None):
+        """
+        @param host(unicode): host IP or domain
+        @param port(int): port
+        @param type_(unicode): stream type (one of XEP_0065.TYPE_*)
+        @param priority(int): priority
+        @param jid_(jid.JID): jid
+        @param id_(None, id_): Candidate ID, or None to generate
+        @param priority_local(bool): if True, priority is used as local priority,
+            else priority is used as global one (and local priority is set to 0)
+        """
+        assert isinstance(jid_, jid.JID)
+        self.host, self.port, self.type, self.jid = (
+            host, int(port), type_, jid_)
+        self.id = id_ if id_ is not None else unicode(uuid.uuid4())
+        if priority_local:
+            self._local_priority = int(priority)
+            self._priority = self.calculatePriority()
+        else:
+            self._local_priority = 0
+            self._priority = int(priority)
+        self.factory = factory
+
+    def discard(self):
+        """Disconnect a candidate if it is connected
+
+        Used to disconnect tryed client when they are discarded
+        """
+        log.debug(u"Discarding {}".format(self))
+        try:
+            self.factory.discard()
+        except AttributeError:
+            pass # no discard for Socks5ServerFactory
+
+    @property
+    def local_priority(self):
+        return self._local_priority
+
+    @property
+    def priority(self):
+        return self._priority
+
+    def __str__(self):
+        # similar to __unicode__ but we don't show jid and we encode id
+        return "Candidate ({0.priority}): host={0.host} port={0.port} type={0.type}{id}".format(
+            self,
+            id=u" id={}".format(self.id if self.id is not None else u'').encode('utf-8', 'ignore'),
+            )
+
+    def __unicode__(self):
+        return u"Candidate ({0.priority}): host={0.host} port={0.port} jid={0.jid} type={0.type}{id}".format(
+            self,
+            id=u" id={}".format(self.id if self.id is not None else u''),
+            )
+
+    def __eq__(self, other):
+        # self.id is is not used in __eq__ as the same candidate can have
+        # different ids if proposed by initiator or responder
+        try:
+            return (self.host == other.host and
+                    self.port == other.port and
+                    self.jid == other.jid)
+        except (AttributeError, TypeError):
+            return False
+
+    def __ne__(self, other):
+        return not self.__eq__(other)
+
+    def calculatePriority(self):
+        """Calculate candidate priority according to XEP-0260 §2.2
+
+
+        @return (int): priority
+        """
+        if self.type == XEP_0065.TYPE_DIRECT:
+            multiplier = 126
+        elif self.type == XEP_0065.TYPE_ASSISTED:
+            multiplier = 120
+        elif self.type == XEP_0065.TYPE_TUNEL:
+            multiplier = 110
+        elif self.type == XEP_0065.TYPE_PROXY:
+            multiplier = 10
+        else:
+            raise exceptions.InternalError(u"Unknown {} type !".format(self.type))
+        return 2**16 * multiplier + self._local_priority
+
+    def startTransfer(self, session_hash=None):
+        self.factory.startTransfer(session_hash)
+
+
+def getSessionHash(from_jid, to_jid, sid):
+    """Calculate SHA1 Hash according to XEP-0065 §5.3.2
+
+    @param from_jid(jid.JID): jid of the requester
+    @param to_jid(jid.JID): jid of the target
+    @param sid(unicode): session id
+    @return (str): hash
+    """
     return hashlib.sha1((sid + from_jid.full() + to_jid.full()).encode('utf-8')).hexdigest()
 
 
 class SOCKSv5(protocol.Protocol, FileSender):
-    def __init__(self):
+
+    def __init__(self, session_hash=None):
+        """
+        @param session_hash(str): hash of the session
+            must only be used in client mode
+        """
         log.debug(_("Protocol init"))
-        self.state = STATE_INITIAL
+        self.connection = defer.Deferred() # called when connection/auth is done
+        if session_hash is not None:
+            self.server_mode = False
+            self._session_hash = session_hash
+            self.state = STATE_CLIENT_INITIAL
+        else:
+            self.server_mode = True
+            self.state = STATE_INITIAL
         self.buf = ""
         self.supportedAuthMechs = [AUTHMECH_ANON]
         self.supportedAddrs = [ADDR_DOMAINNAME]
@@ -149,10 +295,20 @@
         self.peersock = None
         self.addressType = 0
         self.requestType = 0
+        self._file_obj = None
+
+    @property
+    def file_obj(self):
+        if self._file_obj is None:
+            if self.server_mode:
+                self._file_obj = self.factory.getSession(self._session_hash)["file"]
+            else:
+                self._file_obj = self.factory.getSession()['file']
+        return self._file_obj
 
     def _startNegotiation(self):
-        log.debug("_startNegotiation")
-        self.state = STATE_TARGET_AUTH
+        log.debug("starting negotiation (client mode)")
+        self.state = STATE_CLIENT_AUTH
         self.transport.write(struct.pack('!3B', SOCKS5_VER, 1, AUTHMECH_ANON))
 
     def _parseNegotiation(self):
@@ -184,6 +340,7 @@
                     return
 
             # No supported mechs found, notify client and close the connection
+            log.warning(u"Unsupported authentication mechanism")
             self.transport.write(struct.pack('!BB', SOCKS5_VER, AUTHMECH_INVALID))
             self.transport.loseConnection()
         except struct.error:
@@ -258,14 +415,16 @@
                 self.sendErrorReply(REPLY_CMD_NOT_SUPPORTED)
 
         except struct.error:
+            # The buffer is probably not complete, we need to wait more
             return None
 
     def _makeRequest(self):
         log.debug("_makeRequest")
-        self.state = STATE_TARGET_REQUEST
-        sha1 = calculateHash(self.data["from"], self.data["to"], self.sid)
-        request = struct.pack('!5B%dsH' % len(sha1), SOCKS5_VER, CMD_CONNECT, 0, ADDR_DOMAINNAME, len(sha1), sha1, 0)
+        # sha1 = getSessionHash(self.data["from"], self.data["to"], self.sid)
+        hash_ = self._session_hash
+        request = struct.pack('!5B%dsH' % len(hash_), SOCKS5_VER, CMD_CONNECT, 0, ADDR_DOMAINNAME, len(hash_), hash_, 0)
         self.transport.write(request)
+        self.state = STATE_CLIENT_REQUEST
 
     def _parseRequestReply(self):
         log.debug("_parseRequestReply")
@@ -294,49 +453,47 @@
                 self.loseConnection()
                 return
 
-            if self.factory.proxy:
-                self.state = STATE_READY
-                self.factory.activateCb(self.sid, self.factory.iq_id, self.startTransfer, self.profile)
-            else:
-                self.state = STATE_TARGET_READY
-                self.factory.activateCb(self.sid, self.factory.iq_id, self.profile)
+            # if self.factory.proxy:
+            #     self.state = STATE_READY
+            #     self.factory.activateCb(self.sid, self.factory.iq_id, self.startTransfer, self.profile)
+            # else:
+            self.state = STATE_READY
+            self.connection.callback(None)
+            # self.factory.activateCb(self.sid, self.factory.iq_id, self.profile)
 
         except struct.error:
+            # The buffer is probably not complete, we need to wait more
             return None
 
     def connectionMade(self):
-        log.debug(u"connectionMade (mode = %s)" % "requester" if isinstance(self.factory, Socks5ServerFactory) else "target")
-
-        if isinstance(self.factory, Socks5ClientFactory):
-            self.sid = self.factory.sid
-            self.profile = self.factory.profile
-            self.data = self.factory.data
-            self.state = STATE_TARGET_INITIAL
+        log.debug(u"Socks5 connectionMade (mode = {})".format("server" if self.state == STATE_INITIAL else "client"))
+        if self.state == STATE_CLIENT_INITIAL:
             self._startNegotiation()
 
     def connectRequested(self, addr, port):
         log.debug("connectRequested")
 
         # Check that this session is expected
-        if addr not in self.factory.hash_sid_map:
-            #no: we refuse it
+        if not self.factory.addToSession(addr, self):
             self.sendErrorReply(REPLY_CONN_REFUSED)
+            log.warning(u"Unexpected connection request received from {host}"
+                .format(host=self.transport.getPeer().host))
             return
-        self.sid, self.profile = self.factory.hash_sid_map[addr]
-        client = self.factory.host.getClient(self.profile)
-        client.xep_0065_current_stream[self.sid]["start_transfer_cb"] = self.startTransfer
+        self._session_hash = addr
+        # self.sid, self.profile = self.factory.hash_profiles_map[addr]
+        # client = self.factory.host.getClient(self.profile)
+        # client.xep_0065_current_stream[self.sid]["start_transfer_cb"] = self.startTransfer
         self.connectCompleted(addr, 0)
-        self.transport.stopReading()
 
-    def startTransfer(self, file_obj):
+    def startTransfer(self):
         """Callback called when the result iq is received"""
-        d = self.beginFileTransfer(file_obj, self.transport)
+        log.debug(u"Starting file transfer")
+        d = self.beginFileTransfer(self.file_obj, self.transport)
         d.addCallback(self.fileTransfered)
 
     def fileTransfered(self, d):
         log.info(_("File transfer completed, closing connection"))
         self.transport.loseConnection()
-        self.factory.finishedCb(self.sid, True, self.profile)
 
     def connectCompleted(self, remotehost, remoteport):
         log.debug("connectCompleted")
@@ -357,8 +514,9 @@
         return True
 
     def dataReceived(self, buf):
-        if self.state == STATE_TARGET_READY:
-            self.data["file_obj"].write(buf)
+        if self.state == STATE_READY:
+            # Everything is set, we just have to write the incoming data
+            self.file_obj.write(buf)
             return
 
         self.buf = self.buf + buf
@@ -368,216 +526,483 @@
             self._parseUserPass()
         if self.state == STATE_REQUEST:
             self._parseRequest()
-        if self.state == STATE_TARGET_AUTH:
+        if self.state == STATE_CLIENT_REQUEST:
+            self._parseRequestReply()
+        if self.state == STATE_CLIENT_AUTH:
             ver, method = struct.unpack('!BB', buf)
             self.buf = self.buf[2:]
             if ver != SOCKS5_VER or method != AUTHMECH_ANON:
                 self.transport.loseConnection()
             else:
                 self._makeRequest()
-        if self.state == STATE_TARGET_REQUEST:
-            self._parseRequestReply()
-
-    def clientConnectionLost(self, reason):
-        log.debug("clientConnectionLost")
-        self.transport.loseConnection()
 
     def connectionLost(self, reason):
-        log.debug("connectionLost")
-        if self.state != STATE_CONNECT_PENDING:
-            self.transport.unregisterProducer()
-            if self.peersock is not None:
-                self.peersock.peersock = None
-                self.peersock.transport.unregisterProducer()
-                self.peersock = None
+        log.debug(u"Socks5 connection lost: {}".format(reason.value))
+        # self.transport.unregisterProducer()
+        # if self.peersock is not None:
+        #     self.peersock.peersock = None
+        #     self.peersock.transport.unregisterProducer()
+        #     self.peersock = None
+        if self.state != STATE_READY:
+            self.connection.errback(reason)
+        if self.server_mode :
+            self.factory.removeFromSession(self._session_hash, self, reason)
 
 
 class Socks5ServerFactory(protocol.ServerFactory):
     protocol = SOCKSv5
 
-    def __init__(self, host, hash_sid_map, finishedCb):
-        self.host = host
-        self.hash_sid_map = hash_sid_map
-        self.finishedCb = finishedCb
+    def __init__(self, parent):
+        """
+        @param parent(XEP_0065): XEP_0065 parent instance
+        """
+        self.parent = parent
+
+    def getSession(self, session_hash):
+        return self.parent.getSession(session_hash, None)
+
+    def startTransfer(self, session_hash):
+        session = self.getSession(session_hash)
+        try:
+            protocol = session['protocols'][0]
+        except (KeyError, IndexError):
+            log.error(u"Can't start file transfer, can't find protocol")
+        else:
+            protocol.startTransfer()
+
+    def addToSession(self, session_hash, protocol):
+        """Check is session_hash is valid, and associate protocol with it
 
-    def startedConnecting(self, connector):
-        log.debug(_("Socks 5 server connection started"))
+        the session will be associated to the corresponding candidate
+        @param session_hash(str): hash of the session
+        @param protocol(SOCKSv5): protocol instance
+        @param return(bool): True if hash was valid (i.e. expected), False else
+        """
+        try:
+            session_data = self.getSession(session_hash)
+        except KeyError:
+            return False
+        else:
+            session_data.setdefault('protocols', []).append(protocol)
+            return True
+
+    def removeFromSession(self, session_hash, protocol, reason):
+        """Remove a protocol from session_data
 
-    def clientConnectionLost(self, connector, reason):
-        log.debug(_(u"Socks 5 server connection lost (reason: %s)") % reason)
+        There can be several protocol instances while candidates are tried, they
+        have removed when candidate connection is closed
+        @param session_hash(str): hash of the session
+        @param protocol(SOCKSv5): protocol instance
+        @param reason(failure.Failure): reason of the removal
+        """
+        try:
+            protocols = self.getSession(session_hash)['protocols']
+            protocols.remove(protocol)
+        except (KeyError, ValueError):
+            log.error(u"Protocol not found in session while it should be there")
+        else:
+            if not protocols:
+                # The last protocol has been removed, session is finished
+                if reason.check(internet_error.ConnectionDone):
+                    self.getSession(session_hash)[DEFER_KEY].callback(None)
+                else:
+                    self.getSession(session_hash)[DEFER_KEY].errback(reason)
 
 
 class Socks5ClientFactory(protocol.ClientFactory):
     protocol = SOCKSv5
 
-    def __init__(self, current_stream, sid, iq_id, activateCb, finishedCb, proxy=False, profile=None):
+    # def __init__(self, stream_data, sid, iq_id, activateCb, finishedCb, proxy=False, profile=C.PROF_KEY_NONE):
+    def __init__(self, parent, session_hash, profile):
         """Init the Client Factory
-        @param current_stream: current streams data
-        @param sid: Session ID
-        @param iq_id: iq id used to initiate the stream
-        @param activateCb: method to call to activate the stream
-        @param finishedCb: method to call when the stream session is finished
-        @param proxy: True if we are connecting throught a proxy (and we are a requester)
-        @param profile: %(doc_profile)s"""
-        assert(profile)
-        self.data = current_stream[sid]
-        self.sid = sid
-        self.iq_id = iq_id
-        self.activateCb = activateCb
-        self.finishedCb = finishedCb
-        self.proxy = proxy
+
+        @param session_hash(unicode): hash of the session
+            hash is the same as hostname computer in XEP-0065 § 5.3.2 #1
+        @param profile(unciode): %(doc_profile)s
+        """
+        self.session = parent.getSession(session_hash, profile)
+        self.session_hash = session_hash
         self.profile = profile
+        self.connection = defer.Deferred()
+        self._protocol_instance = None
+        self.connector = None
+        self._discarded = False
+        # self.data = stream_data[sid]
+        # self.sid = sid
+        # self.iq_id = iq_id
+        # self.activateCb = activateCb
+        # self.finishedCb = finishedCb
+        # self.proxy = proxy
+        # self.profile = profile
 
-    def startedConnecting(self, connector):
-        log.debug(_("Socks 5 client connection started"))
+    def discard(self):
+        """Disconnect the client
+
+        Also set a discarded flag, which avoid to call the session Deferred
+        """
+        self.connector.disconnect()
+        self._discarded = True
+
+    def getSession(self):
+        return self.session
+
+    def startTransfer(self, dummy=None):
+        self._protocol_instance.startTransfer()
+
+    def clientConnectionFailed(self, connector, reason):
+        log.debug(u"Connection failed")
+        self.connection.errback(reason)
 
     def clientConnectionLost(self, connector, reason):
-        log.debug(_(u"Socks 5 client connection lost (reason: %s)") % reason)
-        self.finishedCb(self.sid, reason.type == error.ConnectionDone, self.profile)  # TODO: really check if the state is actually successful
+        log.debug(_(u"Socks 5 client connection lost (reason: %s)") % reason.value)
+        self._protocol_instance = None
+        if not self._discarded:
+            # This one was used for the transfer, than mean that
+            # the Socks5 session is finished
+            if reason.check(internet_error.ConnectionDone):
+                self.getSession()[DEFER_KEY].callback(None)
+            else:
+                self.getSession()[DEFER_KEY].errback(reason)
+        # self.finishedCb(self.sid, reason.type == internet_error.ConnectionDone, self.profile)  # TODO: really check if the state is actually successful
+
+    def buildProtocol(self, addr):
+        log.debug(("Socks 5 client connection started"))
+        p = self.protocol(session_hash=self.session_hash)
+        p.factory = self
+        p.connection.chainDeferred(self.connection)
+        self._protocol_instance = p
+        return p
 
 
 class XEP_0065(object):
-
     NAMESPACE = NS_BS
-
-    params = """
-    <params>
-    <general>
-    <category name="File Transfer">
-        <param name="IP" value='0.0.0.0' default_cb='yes' type="string" />
-        <param name="Port" value="28915" type="int" constraint="1;65535" />
-    </category>
-    </general>
-    <individual>
-    <category name="File Transfer">
-        <param name="Proxy" value="" type="string" />
-        <param name="Proxy host" value="" type="string" />
-        <param name="Proxy port" value="" type="int" constraint="1;65535" />
-    </category>
-    </individual>
-    </params>
-    """
+    TYPE_DIRECT = 'direct'
+    TYPE_ASSISTED = 'assisted'
+    TYPE_TUNEL = 'tunel'
+    TYPE_PROXY = 'proxy'
+    Candidate = Candidate
 
     def __init__(self, host):
         log.info(_("Plugin XEP_0065 initialization"))
-
-        #session data
-        self.hash_sid_map = {}  # key: hash of the transfer session, value: (session id, profile)
+        self.host = host
 
-        self.host = host
-        log.debug(_("registering"))
-        self.server_factory = Socks5ServerFactory(host, self.hash_sid_map, lambda sid, success, profile: self._killId(sid, success, profile=profile))
+        # session data
+        self.hash_profiles_map = {}  # key: hash of the transfer session, value: session data
+        self._cache_proxies = {} # key: server jid, value: proxy data
+
+        # misc data
+        self._server_factory = None
+        self._external_port = None
 
-        #parameters
-        host.memory.updateParams(XEP_0065.params)
-        host.memory.setDefault("IP", "File Transfer", self.getExternalIP)
-        port = int(self.host.memory.getParamA("Port", "File Transfer"))
+        # plugins shortcuts
+        self._ip = self.host.plugins['IP']
+        try:
+            self._np = self.host.plugins['NAT-PORT']
+        except KeyError:
+            log.debug(u"NAT Port plugin not available")
+            self._np = None
 
-        log.info(_("Launching Socks5 Stream server on port %d") % port)
-        reactor.listenTCP(port, self.server_factory)
+        # parameters
+        # XXX: params are not used for now, but they may be used in the futur to force proxy/IP
+        # host.memory.updateParams(PARAMS)
 
     def getHandler(self, profile):
         return XEP_0065_handler(self)
 
     def profileConnected(self, profile):
         client = self.host.getClient(profile)
-        client.xep_0065_current_stream = {}  # key: stream_id, value: data(dict)
+        client.xep_0065_current_stream = {}  # key: stream_id, value: session_data(dict)
+        client._s5b_sessions = {}
+
+    def getSessionHash(self, from_jid, to_jid, sid):
+        return getSessionHash(from_jid, to_jid, sid)
+
+    def getSocks5ServerFactory(self):
+        """Return server factory
+
+        The server is created if it doesn't exists yet
+        self._server_factory_port is set on server creation
+        """
 
-    def getExternalIP(self):
-        """Return IP visible from outside, by asking to a website"""
-        return getPage("http://www.goffi.org/sat_tools/get_ip.php")
+        if self._server_factory is None:
+            # self._server_factory = Socks5ServerFactory(self.host, self.hash_profiles_map, lambda sid, client: self._killSession(sid, client))
+            self._server_factory = Socks5ServerFactory(self)
+            for port in xrange(SERVER_STARTING_PORT, 65356):
+                try:
+                    listening_port = reactor.listenTCP(port, self._server_factory)
+                except internet_error.CannotListenError as e:
+                    log.debug(u"Cannot listen on port {port}: {err_msg}{err_num}".format(
+                        port=port,
+                        err_msg=e.socketError.strerror,
+                        err_num=u' (error code: {})'.format(e.socketError.errno),
+                        ))
+                else:
+                    self._server_factory_port = listening_port.getHost().port
+                    break
 
-    def getProgress(self, sid, data, profile):
-        """Fill data with position of current transfer"""
+            log.info(_("Socks5 Stream server launched on port {}").format(self._server_factory_port))
+        return self._server_factory
+
+    @defer.inlineCallbacks
+    def getProxy(self, profile):
+        """Return the proxy available for this profile
+
+        cache is used between profiles using the same server
+        @param profile: %(doc_profile)s
+        @return ((D)(ProxyInfos, None)): Found proxy infos,
+            or None if not acceptable proxy is found
+        """
+        def notFound(server):
+            log.info(u"No proxy found on this server")
+            self._cache_proxies[server] = None
+            defer.returnValue(None)
         client = self.host.getClient(profile)
+        server = client.jid.host
         try:
-            file_obj = client.xep_0065_current_stream[sid]["file_obj"]
-            data["position"] = str(file_obj.tell())
-            data["size"] = str(client.xep_0065_current_stream[sid]["size"])
-        except:
+            defer.returnValue(self._cache_proxies[server])
+        except KeyError:
             pass
+        try:
+            proxy = (yield self.host.findServiceEntities('proxy', 'bytestreams', profile=profile)).pop()
+        except (exceptions.CancelError, StopIteration):
+            notFound(server)
+        iq_elt = client.IQ('get')
+        iq_elt['to'] = proxy.full()
+        iq_elt.addElement('query', NS_BS)
 
-    def _timeOut(self, sid, profile):
+        try:
+            result_elt = yield iq_elt.send()
+        except jabber_error.StanzaError as failure:
+            log.warning(u"Error while requesting proxy info on {jid}: {error}"
+                .format(proxy.full(), failure))
+            notFound(server)
+
+        try:
+            query_elt = result_elt.elements(NS_BS, 'query').next()
+            streamhost_elt = query_elt.elements(NS_BS, 'streamhost').next()
+            host = streamhost_elt['host']
+            jid_ = streamhost_elt['jid']
+            port = streamhost_elt['port']
+            if not all((host, jid, port)):
+                raise KeyError
+            jid_ = jid.JID(jid_)
+        except (StopIteration, KeyError, RuntimeError, jid.InvalidFormat):
+            log.warning(u"Invalid proxy data received from {}".format(proxy.full()))
+            notFound(server)
+
+        proxy_infos = self._cache_proxies[server] = ProxyInfos(host, jid_, port)
+        log.info(u"Proxy found: {}".format(proxy_infos))
+        defer.returnValue(proxy_infos)
+
+    @defer.inlineCallbacks
+    def _getNetworkData(self, client):
+        """Retrieve information about network
+
+        @param client: %(doc_client)s
+        @return (D(tuple[local_port, external_port, local_ips, external_ip])): network data
+        """
+        self.getSocks5ServerFactory()
+        local_port = self._server_factory_port
+        external_ip = yield self._ip.getExternalIP(client.profile)
+        local_ips = yield self._ip.getLocalIPs(client.profile)
+
+        if not local_ips:
+            log.warning(u"Can't find local IPs, we can't do direct connection")
+        else:
+            if external_ip is not None and self._external_port is None:
+                if external_ip != local_ips[0]:
+                    log.info(u"We are probably behind a NAT")
+                    if self._np is None:
+                        log.warning(u"NAT port plugin not available, we can't map port")
+                    else:
+                        ext_port = yield self._np.mapPort(local_port, desc=u"SaT socks5 stream")
+                        if ext_port is None:
+                            log.warning(u"Can't map NAT port")
+                        else:
+                            self._external_port = ext_port
+
+        defer.returnValue((local_port, self._external_port, local_ips, external_ip))
+
+    @defer.inlineCallbacks
+    def getCandidates(self, profile):
+        """Return a list of our stream candidates
+
+        @param profile: %(doc_profile)s
+        @return (D(list[Candidate])): list of candidates, ordered by priority
+        """
+        client = self.host.getClient(profile)
+        server_factory = yield self.getSocks5ServerFactory()
+        local_port, ext_port, local_ips, external_ip = yield self._getNetworkData(client)
+        proxy = yield self.getProxy(profile)
+
+        # its time to gather the candidates
+        candidates = []
+
+        # first the direct ones
+        if local_ips:
+            # the preferred direct connection
+            ip = local_ips.pop(0)
+            candidates.append(Candidate(ip, local_port, XEP_0065.TYPE_DIRECT, PRIORITY_BEST_DIRECT, client.jid, priority_local=True, factory=server_factory))
+            for ip in local_ips:
+                candidates.append(Candidate(ip, local_port, XEP_0065.TYPE_DIRECT, PRIORITY_DIRECT, client.jid, priority_local=True, factory=server_factory))
+
+        # then the assisted one
+        if ext_port is not None:
+            candidates.append(Candidate(external_ip, ext_port, XEP_0065.TYPE_ASSISTED, PRIORITY_ASSISTED, client.jid, priority_local=True, factory=server_factory))
+
+        # finally the proxy
+        if proxy:
+            candidates.append(Candidate(proxy.host, proxy.port, XEP_0065.TYPE_PROXY, PRIORITY_PROXY, proxy.jid, priority_local=True))
+
+        # should be already sorted, but just in case the priorities get weird
+        candidates.sort(key=lambda c: c.priority, reverse=True)
+
+        defer.returnValue(candidates)
+
+    def _addConnector(self, connector, candidate):
+        """Add connector used to connect to candidate, and return client factory's connection Deferred
+
+        the connector can be used to disconnect the candidate, and returning the factory's connection Deferred allow to wait for connection completion
+        @param connector: a connector implementing IConnector
+        @param candidate(Candidate): candidate linked to the connector
+        @return (D): Deferred fired when factory connection is done or has failed
+        """
+        candidate.factory.connector = connector
+        return candidate.factory.connection
+
+    def tryCandidates(self, candidates, session_hash, connection_cb=None, connection_eb=None, profile=C.PROF_KEY_NONE):
+        defers_list = []
+
+        for candidate in candidates:
+            factory = Socks5ClientFactory(self, session_hash, profile)
+            candidate.factory = factory
+            delay = CANDIDATE_DELAY * len(defers_list)
+            if candidate.type == XEP_0065.TYPE_PROXY:
+                delay += CANDIDATE_DELAY_PROXY
+            d = sat_defer.DelayedDeferred(delay, candidate.host)
+            d.addCallback(reactor.connectTCP, candidate.port, factory)
+            d.addCallback(self._addConnector, candidate)
+            if connection_cb is not None:
+                d.addCallback(lambda dummy, candidate=candidate, profile=profile: connection_cb(candidate, profile))
+            if connection_eb is not None:
+                d.addErrback(connection_eb, candidate, profile)
+            defers_list.append(d)
+
+        return defers_list
+
+    def getBestCandidate(self, candidates, session_hash, profile=C.PROF_KEY_NONE):
+        defer_candidates = None
+
+        def connectionCb(candidate, profile):
+            log.info(u"Connection of {} successful".format(unicode(candidate)))
+            for idx, other_candidate in enumerate(candidates):
+                try:
+                    if other_candidate.priority < candidate.priority:
+                        log.debug(u"Cancelling {}".format(other_candidate))
+                        defer_candidates[idx].cancel()
+                except AttributeError:
+                    assert other_candidate is None
+
+        def connectionEb(failure, candidate, profile):
+            if failure.check(defer.CancelledError):
+                log.debug(u"Connection of {} has been cancelled".format(candidate))
+            else:
+                log.info(u"Connection of {candidate} Failed: {error}".format(
+                    candidate = candidate,
+                    error = failure.value))
+            candidates[candidates.index(candidate)] = None
+
+        def allTested(self):
+            log.debug(u"All candidates have been tested")
+            good_candidates = [c for c in candidates if c]
+            return good_candidates[0] if good_candidates else None
+
+        defer_candidates = self.tryCandidates(candidates, session_hash, connectionCb, connectionEb, profile)
+        d_list = defer.DeferredList(defer_candidates)
+        d_list.addCallback(allTested)
+        return d_list
+
+    def _timeOut(self, sid, client):
         """Delecte current_stream id, called after timeout
         @param id: id of client.xep_0065_current_stream"""
-        log.info(_("Socks5 Bytestream: TimeOut reached for id %(sid)s [%(profile)s]")
-             % {"sid": sid, "profile": profile})
-        self._killId(sid, False, "TIMEOUT", profile)
+        log.info(_("Socks5 Bytestream: TimeOut reached for id {sid} [{profile}]").format(
+            sid=sid, profile=client.profile))
+        self._killSession(sid, client, u"TIMEOUT")
+
+    def _killSession(self, sid, client, failure_reason=None):
+        """Delete a current_stream id, clean up associated observers
 
-    def _killId(self, sid, success=False, failure_reason="UNKNOWN", profile=None):
-        """Delete an current_stream id, clean up associated observers
-        @param sid: id of client.xep_0065_current_stream"""
-        assert(profile)
-        client = self.host.getClient(profile)
-        if sid not in client.xep_0065_current_stream:
+        @param sid(unicode): session id
+        @param client: %(doc_client)s
+        @param failure_reason(None, unicode): if None the session is successful
+            else, will be used to call failure_cb
+        """
+        try:
+            session = client.xep_0065_current_stream[sid]
+        except KeyError:
             log.warning(_("kill id called on a non existant id"))
             return
-        if "observer_cb" in client.xep_0065_current_stream[sid]:
-            xmlstream = client.xep_0065_current_stream[sid]["xmlstream"]
-            xmlstream.removeObserver(client.xep_0065_current_stream[sid]["event_data"], client.xep_0065_current_stream[sid]["observer_cb"])
-        if client.xep_0065_current_stream[sid]['timer'].active():
-            client.xep_0065_current_stream[sid]['timer'].cancel()
-        if "size" in client.xep_0065_current_stream[sid]:
-            self.host.removeProgressCB(sid, profile)
+
+        try:
+            observer_cb = session['observer_cb']
+        except KeyError:
+            pass
+        else:
+            client.xmlstream.removeObserver(session["event_data"], observer_cb)
+
+        if session['timer'].active():
+            session['timer'].cancel()
 
-        file_obj = client.xep_0065_current_stream[sid]['file_obj']
-        success_cb = client.xep_0065_current_stream[sid]['success_cb']
-        failure_cb = client.xep_0065_current_stream[sid]['failure_cb']
+        del client.xep_0065_current_stream[sid]
 
-        session_hash = client.xep_0065_current_stream[sid].get('hash')
-        del client.xep_0065_current_stream[sid]
-        if session_hash in self.hash_sid_map:
-            #FIXME: check that self.hash_sid_map is correctly cleaned in all cases (timeout, normal flow, etc).
-            del self.hash_sid_map[session_hash]
+        # FIXME: to check
+        try:
+            session_hash = session.get['hash']
+            del self.hash_profiles_map[session_hash]
+            # FIXME: check that self.hash_profiles_map is correctly cleaned in all cases (timeout, normal flow, etc).
+        except KeyError:
+            log.debug(u"Not hash found for this session")
+            pass
+
+        success = failure_reason is None
+        stream_d = session[DEFER_KEY]
 
         if success:
-            success_cb(sid, file_obj, NS_BS, profile)
+            stream_d.callback(None)
         else:
-            failure_cb(sid, file_obj, NS_BS, failure_reason, profile)
+            stream_d.errback(failure.Failure(exceptions.DataError(failure_reason)))
 
-    def startStream(self, file_obj, to_jid, sid, length, successCb, failureCb, size=None, profile=None):
+    def startStream(self, file_obj, to_jid, sid, profile=C.PROF_KEY_NONE):
         """Launch the stream workflow
+
         @param file_obj: file_obj to send
         @param to_jid: JID of the recipient
         @param sid: Stream session id
-        @param length: number of byte to send, or None to send until the end
         @param successCb: method to call when stream successfuly finished
         @param failureCb: method to call when something goes wrong
-        @param profile: %(doc_profile)s"""
-        assert(profile)
+        @param profile: %(doc_profile)s
+        """
         client = self.host.getClient(profile)
-
-        if length is not None:
-            log.error(_('stream length not managed yet'))
-            return
-
-        profile_jid = client.jid
-        xmlstream = client.xmlstream
+        session_data = self._createSession(file_obj, to_jid, sid, client.profile)
 
-        data = client.xep_0065_current_stream[sid] = {}
-        data["timer"] = reactor.callLater(TIMEOUT, self._timeOut, sid, profile)
-        data["file_obj"] = file_obj
-        data["from"] = profile_jid
-        data["to"] = to_jid
-        data["success_cb"] = successCb
-        data["failure_cb"] = failureCb
-        data["xmlstream"] = xmlstream
-        data["hash"] = calculateHash(profile_jid, to_jid, sid)
-        self.hash_sid_map[data["hash"]] = (sid, profile)
-        if size:
-            data["size"] = size
-            self.host.registerProgressCB(sid, self.getProgress, profile)
-        iq_elt = jabber_client.IQ(xmlstream, 'set')
-        iq_elt["from"] = profile_jid.full()
+        session_data["to"] = to_jid
+        session_data["xmlstream"] = client.xmlstream
+        hash_ = session_data["hash"] = getSessionHash(client.jid, to_jid, sid)
+
+        self.hash_profiles_map[hash_] = (sid, profile)
+
+        iq_elt = jabber_client.IQ(client.xmlstream, 'set')
+        iq_elt["from"] = client.jid.full()
         iq_elt["to"] = to_jid.full()
         query_elt = iq_elt.addElement('query', NS_BS)
         query_elt['mode'] = 'tcp'
         query_elt['sid'] = sid
+
         #first streamhost: direct connection
         streamhost = query_elt.addElement('streamhost')
         streamhost['host'] = self.host.memory.getParamA("IP", "File Transfer")
         streamhost['port'] = self.host.memory.getParamA("Port", "File Transfer")
-        streamhost['jid'] = profile_jid.full()
+        streamhost['jid'] = client.jid.full()
 
         #second streamhost: mediated connection, using proxy
         streamhost = query_elt.addElement('streamhost')
@@ -585,60 +1010,68 @@
         streamhost['port'] = self.host.memory.getParamA("Proxy port", "File Transfer", profile_key=profile)
         streamhost['jid'] = self.host.memory.getParamA("Proxy", "File Transfer", profile_key=profile)
 
-        iq_elt.addCallback(self.iqResult, sid, profile)
+        iq_elt.addCallback(self._IQOpen, session_data, client)
         iq_elt.send()
+        return session_data[DEFER_KEY]
 
-    def iqResult(self, sid, profile, iq_elt):
-        """Called when the result of open iq is received"""
+    def _IQOpen(self, session_data, client, iq_elt):
+        """Called when the result of open iq is received
+
+        @param session_data(dict): data of the session
+        @param client: %(doc_client)s
+        @param iq_elt(domish.Element): <iq> result
+        """
+        sid = session_data['id']
         if iq_elt["type"] == "error":
-            log.warning(_("Transfer failed"))
-            return
-        client = self.host.getClient(profile)
-        try:
-            data = client.xep_0065_current_stream[sid]
-            file_obj = data["file_obj"]
-            timer = data["timer"]
-        except KeyError:
-            log.error(_("Internal error, can't do transfer"))
+            log.warning(_("Socks5 transfer failed"))
+            # FIXME: must clean session
             return
 
-        if timer.active():
-            timer.cancel()
+        try:
+            session_data = client.xep_0065_current_stream[sid]
+            file_obj = session_data["file_obj"]
+            timer = session_data["timer"]
+        except KeyError:
+            raise exceptions.InternalError
 
-        profile_jid, xmlstream = self.host.getJidNStream(profile)
-        query_elt = iq_elt.firstChildElement()
-        streamhost_elts = filter(lambda elt: elt.name == 'streamhost-used', query_elt.elements())
+        timer.reset(TIMEOUT)
+
+        query_elt = iq_elt.elements(NS_BS, 'query').next()
+        streamhost_elts = list(query_elt.elements(NS_BS, 'streamhost-used'))
+
         if not streamhost_elts:
             log.warning(_("No streamhost found in stream query"))
+            # FIXME: must clean session
             return
 
+        # FIXME: must be cleaned !
+
         streamhost_jid = streamhost_elts[0]['jid']
-        if streamhost_jid != profile_jid.full():
+        if streamhost_jid != client.jid.full():
             log.debug(_("A proxy server is used"))
-            proxy_host = self.host.memory.getParamA("Proxy host", "File Transfer", profile_key=profile)
-            proxy_port = self.host.memory.getParamA("Proxy port", "File Transfer", profile_key=profile)
-            proxy_jid = self.host.memory.getParamA("Proxy", "File Transfer", profile_key=profile)
+            proxy_host = self.host.memory.getParamA("Proxy host", "File Transfer", profile_key=client.profile)
+            proxy_port = self.host.memory.getParamA("Proxy port", "File Transfer", profile_key=client.profile)
+            proxy_jid = self.host.memory.getParamA("Proxy", "File Transfer", profile_key=client.profile)
             if proxy_jid != streamhost_jid:
                 log.warning(_("Proxy jid is not the same as in parameters, this should not happen"))
                 return
-            factory = Socks5ClientFactory(client.xep_0065_current_stream, sid, None, self.activateProxyStream, lambda sid, success, profile: self._killId(sid, success, profile=profile), True, profile)
+            factory = Socks5ClientFactory(client.xep_0065_current_stream, sid, None, self.activateProxyStream, lambda sid, success, profile: self._killSession(sid, client), True, client.profile)
             reactor.connectTCP(proxy_host, int(proxy_port), factory)
         else:
-            data["start_transfer_cb"](file_obj)  # We now activate the stream
+            session_data["start_transfer_cb"](file_obj)  # We now activate the stream
 
     def activateProxyStream(self, sid, iq_id, start_transfer_cb, profile):
         log.debug(_("activating stream"))
         client = self.host.getClient(profile)
-        data = client.xep_0065_current_stream[sid]
-        profile_jid, xmlstream = self.host.getJidNStream(profile)
+        session_data = client.xep_0065_current_stream[sid]
 
-        iq_elt = client.IQ(xmlstream, 'set')
-        iq_elt["from"] = profile_jid.full()
+        iq_elt = client.IQ(client.xmlstream, 'set')
+        iq_elt["from"] = client.jid.full()
         iq_elt["to"] = self.host.memory.getParamA("Proxy", "File Transfer", profile_key=profile)
         query_elt = iq_elt.addElement('query', NS_BS)
         query_elt['sid'] = sid
-        query_elt.addElement('activate', content=data['to'].full())
-        iq_elt.addCallback(self.proxyResult, sid, start_transfer_cb, data['file_obj'])
+        query_elt.addElement('activate', content=session_data['to'].full())
+        iq_elt.addCallback(self.proxyResult, sid, start_transfer_cb, session_data['file_obj'])
         iq_elt.send()
 
     def proxyResult(self, sid, start_transfer_cb, file_obj, iq_elt):
@@ -648,26 +1081,95 @@
         else:
             start_transfer_cb(file_obj)
 
-    def prepareToReceive(self, from_jid, sid, file_obj, size, success_cb, failure_cb, profile):
+    def createSession(self, *args, **kwargs):
+        """like [_createSession] but return the session deferred instead of the whole session
+
+        session deferred is fired when transfer is finished
+        """
+        return self._createSession(*args, **kwargs)[DEFER_KEY]
+
+    def _createSession(self, file_obj, to_jid, sid, profile):
         """Called when a bytestream is imminent
-        @param from_jid: jid of the sender
-        @param sid: Stream id
-        @param file_obj: File object where data will be written
-        @param size: full size of the data, or None if unknown
-        @param success_cb: method to call when successfuly finished
-        @param failure_cb: method to call when something goes wrong
-        @param profile: %(doc_profile)s"""
+
+        @param file_obj(file): File object where data will be written
+        @param to_jid(jid.JId): jid of the other peer
+        @param sid(unicode): session id
+        @param profile: %(doc_profile)s
+        @return (dict): session data
+        """
         client = self.host.getClient(profile)
-        data = client.xep_0065_current_stream[sid] = {}
-        data["from"] = from_jid
-        data["file_obj"] = file_obj
-        data["seq"] = -1
-        if size:
-            data["size"] = size
-            self.host.registerProgressCB(sid, self.getProgress, profile)
-        data["timer"] = reactor.callLater(TIMEOUT, self._timeOut, sid, profile)
-        data["success_cb"] = success_cb
-        data["failure_cb"] = failure_cb
+        if sid in client.xep_0065_current_stream:
+            raise exceptions.ConflictError(u'A session with this id already exists !')
+        session_data = client.xep_0065_current_stream[sid] = \
+            {'id': sid,
+             DEFER_KEY: defer.Deferred(),
+             'to': to_jid,
+             'file_obj': file_obj,
+             'seq': -1, # FIXME: to check
+             'timer': reactor.callLater(TIMEOUT, self._timeOut, sid, client),
+            }
+
+        return session_data
+
+    def getSession(self, session_hash, profile):
+        """Return session data
+
+        @param session_hash(unicode): hash of the session
+            hash is the same as hostname computer in XEP-0065 § 5.3.2 #1
+        @param profile(None, unicode): profile of the peer
+            None is used only if profile is unknown (this is only the case
+            for incoming request received by Socks5ServerFactory). None must
+            only be used by Socks5ServerFactory.
+            See comments below for details
+        @return (dict): session data
+        """
+        if profile is None:
+            try:
+                profile =  self.hash_profiles_map[session_hash]
+            except KeyError as e:
+                log.warning(u"The requested session doesn't exists !")
+                raise e
+        client = self.host.getClient(profile)
+        return client._s5b_sessions[session_hash]
+
+    def registerHash(self, *args, **kwargs):
+        """like [_registerHash] but resutrn the session deferred instead of the whole session
+        session deferred is fired when transfer is finished
+        """
+        return self._registerHash(*args, **kwargs)[DEFER_KEY]
+
+    def _registerHash(self, session_hash, file_obj, profile):
+        """Create a session_data associated to hash
+
+        @param session_hash(str): hash of the session
+        @param file_obj(file): file-like object
+        @param profile: %(doc_profile)s
+        return (dict): session data
+        """
+        client = self.host.getClient(profile)
+        assert session_hash not in client._s5b_sessions
+        session_data = client._s5b_sessions[session_hash] = {
+            "file": file_obj,
+            DEFER_KEY: defer.Deferred(),
+            }
+        if session_hash in self.hash_profiles_map:
+            # The only case when 2 profiles want to register the same hash
+            # is when they are on the same instance
+            log.info(u"Both Socks5 peers are on the same instance")
+            # XXX:If both peers are on the same instance, they'll register the same
+            #     session_hash, so we'll have 2 profiles for the same hash. The first
+            #     one will be the responder (and so the second one the initiator).
+            #     As we'll keep the initiator choosed candidate (see XEP-0260 § 2.4 #4),
+            #     responder will handle the Socks5 server. Only the server will use
+            #     self.hash_profiles_map to get the profile, so we can ignore the second
+            #     one (the initiator profile).
+            #     There is no easy way to known if the incoming connection
+            #     to the Socks5Server is from initiator or responder, so this seams a
+            #     reasonable workaround.
+        else:
+            self.hash_profiles_map[session_hash] = profile
+
+        return session_data
 
     def streamQuery(self, iq_elt, profile):
         """Get file using byte stream"""
@@ -710,23 +1212,23 @@
         client.xep_0065_current_stream[sid]["streamhost"] = (sh_host, sh_port, sh_jid)
 
         log.info(_("Stream proposed: host=[%(host)s] port=[%(port)s]") % {'host': sh_host, 'port': sh_port})
-        factory = Socks5ClientFactory(client.xep_0065_current_stream, sid, iq_elt["id"], self.activateStream, lambda sid, success, profile: self._killId(sid, success, profile=profile), profile=profile)
+        factory = Socks5ClientFactory(client.xep_0065_current_stream, sid, iq_elt["id"], self.activateStream, lambda sid, success, profile: self._killSession(sid, client), profile=profile)
         reactor.connectTCP(sh_host, int(sh_port), factory)
 
     def activateStream(self, sid, iq_id, profile):
         client = self.host.getClient(profile)
         log.debug(_("activating stream"))
         result = domish.Element((None, 'iq'))
-        data = client.xep_0065_current_stream[sid]
+        session_data = client.xep_0065_current_stream[sid]
         result['type'] = 'result'
         result['id'] = iq_id
-        result['from'] = data["to"].full()
-        result['to'] = data["from"].full()
+        result['from'] = session_data["to"].full()
+        result['to'] = session_data["from"].full()
         query = result.addElement('query', NS_BS)
         query['sid'] = sid
         streamhost = query.addElement('streamhost-used')
-        streamhost['jid'] = data["streamhost"][2]
-        data["xmlstream"].send(result)
+        streamhost['jid'] = session_data["streamhost"][2]
+        session_data["xmlstream"].send(result)
 
     def sendNotAcceptableError(self, iq_id, to_jid, xmlstream):
         """Not acceptable error used when the stream is not expected or something is going wrong
@@ -764,50 +1266,8 @@
         self.plugin_parent = plugin_parent
         self.host = plugin_parent.host
 
-    def _proxyDataResult(self, iq_elt):
-        """Called with the information about proxy according to XEP-0065 #4
-        Params should be filled with these infos"""
-        if iq_elt["type"] == "error":
-            log.warning(_("Can't determine proxy information"))
-            return
-        query_elt = iq_elt.firstChildElement()
-        if query_elt.name != "query":
-            log.warning(_("Bad answer received from proxy"))
-            return
-        streamhost_elts = filter(lambda elt: elt.name == 'streamhost', query_elt.elements())
-        if not streamhost_elts:
-            log.warning(_("No streamhost found in stream query"))
-            return
-        if len(streamhost_elts) != 1:
-            log.warning(_("Multiple streamhost elements in proxy not managed, keeping only the first one"))
-        streamhost_elt = streamhost_elts[0]
-        self.host.memory.setParam("Proxy", streamhost_elt.getAttribute("jid", ""),
-                                  "File Transfer", profile_key=self.parent.profile)
-        self.host.memory.setParam("Proxy host", streamhost_elt.getAttribute("host", ""),
-                                  "File Transfer", profile_key=self.parent.profile)
-        self.host.memory.setParam("Proxy port", streamhost_elt.getAttribute("port", ""),
-                                          "File Transfer", profile_key=self.parent.profile)
-
     def connectionInitialized(self):
-        def connection_ok(dummy):
-            self.xmlstream.addObserver(BS_REQUEST, self.plugin_parent.streamQuery, profile=self.parent.profile)
-            proxy = self.host.memory.getParamA("Proxy", "File Transfer", profile_key=self.parent.profile)
-            if not proxy:
-                def proxiesFound(entities):
-                    try:
-                        proxy_ent = entities.pop()
-                    except KeyError:
-                        log.info(_("No proxy found on this server"))
-                        return
-                    iq_elt = jabber_client.IQ(self.parent.xmlstream, 'get')
-                    iq_elt["to"] = proxy_ent.full()
-                    iq_elt.addElement('query', NS_BS)
-                    iq_elt.addCallback(self._proxyDataResult)
-                    iq_elt.send()
-                d = self.host.findServiceEntities("proxy", "bytestreams", profile_key=self.parent.profile)
-                d.addCallback(proxiesFound)
-        self.parent.getConnectionDeferred().addCallback(connection_ok)
-
+        self.xmlstream.addObserver(BS_REQUEST, self.plugin_parent.streamQuery, profile=self.parent.profile)
 
     def getDiscoInfo(self, requestor, target, nodeIdentifier=''):
         return [disco.DiscoFeature(NS_BS)]
--- a/src/plugins/plugin_xep_0096.py	Mon Nov 02 22:02:41 2015 +0100
+++ b/src/plugins/plugin_xep_0096.py	Mon Nov 02 22:02:41 2015 +0100
@@ -155,23 +155,23 @@
                 dest_path = frontend_data['dest_path']
             except KeyError:
                 log.error(_('dest path not found in frontend_data'))
-                del(client._xep_0096_waiting_for_approval[sid])
+                del client._xep_0096_waiting_for_approval[sid]
                 return
             if stream_method == self.host.plugins["XEP-0065"].NAMESPACE:
-                file_obj = self._getFileObject(dest_path, can_range)
-                range_offset = file_obj.tell()
-                self.host.plugins["XEP-0065"].prepareToReceive(jid.JID(data['from']), sid, file_obj, int(data["size"]), self._transferSucceeded, self._transferFailed, profile)
+                plugin = self.host.plugins["XEP-0065"]
             elif stream_method == self.host.plugins["XEP-0047"].NAMESPACE:
-                file_obj = self._getFileObject(dest_path, can_range)
-                range_offset = file_obj.tell()
-                d = self.host.plugins["XEP-0047"].createSession(file_obj, jid.JID(data['from']), sid, int(data["size"]), profile)
-                d.addCallback(self._transferSucceeded, sid, file_obj, self.host.plugins["XEP-0047"].NAMESPACE, profile)
-                d.addErrback(self._transferFailed, sid, file_obj, self.host.plugins["XEP-0047"].NAMESPACE, profile)
+                plugin = self.host.plugins["XEP-0047"]
             else:
                 log.error(_("Unknown stream method, this should not happen at this stage, cancelling transfer"))
-                del(client._xep_0096_waiting_for_approval[sid])
+                del client._xep_0096_waiting_for_approval[sid]
                 return
 
+            file_obj = self._getFileObject(dest_path, can_range)
+            range_offset = file_obj.tell()
+            d = plugin.createSession(file_obj, jid.JID(data['from']), sid, int(data["size"]), profile)
+            d.addCallback(self._transferSucceeded, sid, file_obj, stream_method, profile)
+            d.addErrback(self._transferFailed, sid, file_obj, stream_method, profile)
+
             #we can send the iq result
             feature_elt = self.host.plugins["XEP-0020"].chooseOption({'stream-method': stream_method})
             misc_elts = []
@@ -249,27 +249,27 @@
             return
 
         range_offset = 0
-        range_length = None
+        # range_length = None
         range_elts = filter(lambda elt: elt.name == 'range', si_elt.elements())
         if range_elts:
             range_elt = range_elts[0]
             range_offset = range_elt.getAttribute("offset", 0)
-            range_length = range_elt.getAttribute("length")
+            # range_length = range_elt.getAttribute("length")
 
         if stream_method == self.host.plugins["XEP-0065"].NAMESPACE:
-            file_obj = open(filepath, 'r')
-            if range_offset:
-                file_obj.seek(range_offset)
-            self.host.plugins["XEP-0065"].startStream(file_obj, jid.JID(IQ['from']), sid, range_length, self._sendSuccessCb, self._sendFailureCb, size, profile)
+            plugin = self.host.plugins["XEP-0065"]
         elif stream_method == self.host.plugins["XEP-0047"].NAMESPACE:
-            file_obj = open(filepath, 'r')
-            if range_offset:
-                file_obj.seek(range_offset)
-            d = self.host.plugins["XEP-0047"].startStream(file_obj, jid.JID(IQ['from']), sid, profile=profile)
-            d.addCallback(self.sendSuccessCb, sid, file_obj, self.host.plugins["XEP-0047"].NAMESPACE, profile)
-            d.addErrback(self.sendFailureCb, sid, file_obj, self.host.plugins["XEP-0047"].NAMESPACE, profile)
+            plugin = self.host.plugins["XEP-0047"]
         else:
-            log.warning(_("Invalid stream method received"))
+            log.error(u"Invalid stream method received")
+            return
+
+        file_obj = open(filepath, 'r')
+        if range_offset:
+            file_obj.seek(range_offset)
+        d = plugin.startStream(file_obj, jid.JID(IQ['from']), sid, profile=profile)
+        d.addCallback(self.sendSuccessCb, sid, file_obj, stream_method, profile)
+        d.addErrback(self.sendFailureCb, sid, file_obj, stream_method, profile)
 
     def sendFile(self, to_jid, filepath, data={}, profile_key=C.PROF_KEY_NONE):
         """send a file using XEP-0096