changeset 1577:d04d7402b8e9

plugins XEP-0020, XEP-0065, XEP-0095, XEP-0096: fixed file copy with Stream Initiation: /!\ range is not working yet /!\ pipe plugin is broken for now
author Goffi <goffi@goffi.org>
date Wed, 11 Nov 2015 18:19:49 +0100
parents d5f59ba166fe
children 7fef6cdf5953
files src/plugins/plugin_xep_0020.py src/plugins/plugin_xep_0065.py src/plugins/plugin_xep_0095.py src/plugins/plugin_xep_0096.py
diffstat 4 files changed, 535 insertions(+), 551 deletions(-) [+]
line wrap: on
line diff
--- a/src/plugins/plugin_xep_0020.py	Wed Nov 11 18:19:49 2015 +0100
+++ b/src/plugins/plugin_xep_0020.py	Wed Nov 11 18:19:49 2015 +0100
@@ -20,7 +20,7 @@
 from sat.core.i18n import _
 from sat.core.log import getLogger
 log = getLogger(__name__)
-from twisted.words.protocols.jabber import client, jid
+from core import exceptions
 from twisted.words.xish import domish
 
 from zope.interface import implements
@@ -55,53 +55,93 @@
 
     def getFeatureElt(self, elt):
         """Check element's children to find feature elements
-        @param elt: domish.Element
-        @return: feature elements"""
-        return [child for child in elt.elements() if child.name == 'feature']
+
+        @param elt(domish.Element): parent element of the feature element
+        @return: feature elements
+        @raise exceptions.NotFound: no feature element found
+        """
+        try:
+            feature_elt = elt.elements(NS_FEATURE_NEG, 'feature').next()
+        except StopIteration:
+            raise exceptions.NotFound
+        return feature_elt
+
+    def _getForm(self, elt, namespace):
+        """Return the first child data form
 
-    def getChoosedOptions(self, elt):
+        @param elt(domish.Element): parent of the data form
+        @param namespace (None, unicode): form namespace or None to ignore
+        @return (None, data_form.Form): data form or None is nothing is found
+        """
+        if namespace is None:
+            try:
+                form_elt = elt.elements(data_form.NS_X_DATA).next()
+            except StopIteration:
+                return None
+            else:
+                return data_form.Form.fromElement(form_elt)
+        else:
+            return data_form.findForm(elt, namespace)
+
+    def getChoosedOptions(self, feature_elt, namespace):
         """Return choosed feature for feature element
-        @param elt: feature domish element
-        @return: dict with feature name as key, and choosed option as value"""
-        form = data_form.Form.fromElement(elt.firstChildElement())
+
+        @param feature_elt(domish.Element): feature domish element
+        @param namespace (None, unicode): form namespace or None to ignore
+        @return (dict): feature name as key, and choosed option as value
+        @raise exceptions.NotFound: not data form is found
+        """
+        form = self._getForm(feature_elt, namespace)
+        if form is None:
+            raise exceptions.NotFound
         result = {}
         for field in form.fields:
             values = form.fields[field].values
             result[field] = values[0] if values else None
             if len(values) > 1:
-                log.warning(_(u"More than one value choosed for %s, keeping the first one") % field)
+                log.warning(_(u"More than one value choosed for {}, keeping the first one").format(field))
         return result
 
-    def negociate(self, feature_elt, form_type, negociable_values):
-        """Negociate the feature options
-        @param feature_elt: feature domish element
-        @param form_type: the option to negociate
-        @param negociable_values: acceptable values for this negociation"""
-        form = data_form.Form.fromElement(feature_elt.firstChildElement())
-        options = [option.value for option in form.fields[form_type].options]
-        for value in negociable_values:
+    def negotiate(self, feature_elt, name, negotiable_values, namespace):
+        """Negotiate the feature options
+
+        @param feature_elt(domish.Element): feature element
+        @param name: the option name (i.e. field's var attribute) to negotiate
+        @param negotiable_values(iterable): acceptable values for this negotiation
+            first corresponding value will be returned
+        @param namespace (None, unicode): form namespace or None to ignore
+        @raise KeyError: name is not found in data form fields
+        """
+        form = self._getForm(feature_elt, namespace)
+        options = [option.value for option in form.fields[name].options]
+        for value in negotiable_values:
             if value in options:
                 return value
         return None
 
-    def chooseOption(self, options_dict):
+    def chooseOption(self, options, namespace):
         """Build a feature element with choosed options
-        @param options_dict: dict with feature as key and choosed option as value"""
+
+        @param options(dict): dict with feature as key and choosed option as value
+        @param namespace (None, unicode): form namespace or None to ignore
+        """
         feature_elt = domish.Element((NS_FEATURE_NEG, 'feature'))
-        x_form = data_form.Form('submit')
-        x_form.makeFields(options_dict)
+        x_form = data_form.Form('submit', formNamespace=namespace)
+        x_form.makeFields(options)
         feature_elt.addChild(x_form.toElement())
         return feature_elt
 
-    def proposeFeatures(self, options_dict, namespace=None):
+    def proposeFeatures(self, options_dict, namespace):
         """Build a feature element with options to propose
-        @param options_dict: dict with feature as key and list of acceptable options as value
-        @param namespace: feature namespace"""
+
+        @param options_dict(dict): dict with feature as key and iterable of acceptable options as value
+        @param namespace(None, unicode): feature namespace
+        """
         feature_elt = domish.Element((NS_FEATURE_NEG, 'feature'))
         x_form = data_form.Form('form', formNamespace=namespace)
         for field in options_dict:
             x_form.addField(data_form.Field('list-single', field,
-                            options=[data_form.Option(_option) for _option in options_dict[field]]))
+                            options=[data_form.Option(option) for option in options_dict[field]]))
         feature_elt.addChild(x_form.toElement())
         return feature_elt
 
--- a/src/plugins/plugin_xep_0065.py	Wed Nov 11 18:19:49 2015 +0100
+++ b/src/plugins/plugin_xep_0065.py	Wed Nov 11 18:19:49 2015 +0100
@@ -63,13 +63,12 @@
 from twisted.internet import protocol
 from twisted.internet import reactor
 from twisted.internet import error as internet_error
-from twisted.words.protocols.jabber import jid, client as jabber_client
 from twisted.words.protocols.jabber import error as jabber_error
+from twisted.words.protocols.jabber import jid
+from twisted.words.protocols.jabber import xmlstream
 from twisted.protocols.basic import FileSender
-from twisted.words.xish import domish
 from twisted.internet import defer
 from twisted.python import failure
-from sat.core.exceptions import ProfileNotInCacheError
 from collections import namedtuple
 import struct
 import hashlib
@@ -432,7 +431,6 @@
             return None
 
     def _makeRequest(self):
-        # sha1 = getSessionHash(self.data["from"], self.data["to"], self.sid)
         hash_ = self._session_hash
         request = struct.pack('!5B%dsH' % len(hash_), SOCKS5_VER, CMD_CONNECT, 0, ADDR_DOMAINNAME, len(hash_), hash_, 0)
         self.transport.write(request)
@@ -464,13 +462,8 @@
                 self.loseConnection()
                 return
 
-            # if self.factory.proxy:
-            #     self.state = STATE_READY
-            #     self.factory.activateCb(self.sid, self.factory.iq_id, self.startTransfer, self.profile)
-            # else:
             self.state = STATE_READY
             self.connection.callback(None)
-            # self.factory.activateCb(self.sid, self.factory.iq_id, self.profile)
 
         except struct.error:
             # The buffer is probably not complete, we need to wait more
@@ -489,9 +482,6 @@
                 .format(host=self.transport.getPeer().host))
             return
         self._session_hash = addr
-        # self.sid, self.profile = self.factory.hash_profiles_map[addr]
-        # client = self.factory.host.getClient(self.profile)
-        # client.xep_0065_current_stream[self.sid]["start_transfer_cb"] = self.startTransfer
         self.connectCompleted(addr, 0)
 
     def startTransfer(self):
@@ -546,11 +536,6 @@
 
     def connectionLost(self, reason):
         log.debug(u"Socks5 connection lost: {}".format(reason.value))
-        # self.transport.unregisterProducer()
-        # if self.peersock is not None:
-        #     self.peersock.peersock = None
-        #     self.peersock.transport.unregisterProducer()
-        #     self.peersock = None
         if self.state != STATE_READY:
             self.connection.errback(reason)
         if self.server_mode :
@@ -620,7 +605,6 @@
 class Socks5ClientFactory(protocol.ClientFactory):
     protocol = SOCKSv5
 
-    # def __init__(self, stream_data, sid, iq_id, activateCb, finishedCb, proxy=False, profile=C.PROF_KEY_NONE):
     def __init__(self, parent, session_hash, profile):
         """Init the Client Factory
 
@@ -635,13 +619,6 @@
         self._protocol_instance = None
         self.connector = None
         self._discarded = False
-        # self.data = stream_data[sid]
-        # self.sid = sid
-        # self.iq_id = iq_id
-        # self.activateCb = activateCb
-        # self.finishedCb = finishedCb
-        # self.proxy = proxy
-        # self.profile = profile
 
     def discard(self):
         """Disconnect the client
@@ -671,7 +648,6 @@
                 self.getSession()[DEFER_KEY].callback(None)
             else:
                 self.getSession()[DEFER_KEY].errback(reason)
-        # self.finishedCb(self.sid, reason.type == internet_error.ConnectionDone, self.profile)  # TODO: really check if the state is actually successful
 
     def buildProtocol(self, addr):
         log.debug(("Socks 5 client connection started"))
@@ -719,7 +695,7 @@
 
     def profileConnected(self, profile):
         client = self.host.getClient(profile)
-        client.xep_0065_current_stream = {}  # key: stream_id, value: session_data(dict)
+        client.xep_0065_sid_session = {}  # key: stream_id, value: session_data(dict)
         client._s5b_sessions = {}
 
     def getSessionHash(self, from_jid, to_jid, sid):
@@ -776,7 +752,7 @@
             notFound(server)
         iq_elt = client.IQ('get')
         iq_elt['to'] = proxy.full()
-        iq_elt.addElement('query', NS_BS)
+        iq_elt.addElement((NS_BS, 'query'))
 
         try:
             result_elt = yield iq_elt.send()
@@ -914,6 +890,14 @@
         return defers_list
 
     def getBestCandidate(self, candidates, session_hash, profile=C.PROF_KEY_NONE):
+        """Get best candidate (according to priority) which can connect
+
+        @param candidates(iterable[Candidate]): candidates to test
+        @param session_hash(unicode): hash of the session
+            hash is the same as hostname computer in XEP-0065 ยง 5.3.2 #1
+        @param profile: %(doc_profile)s
+        @return (D(None, Candidate)): best candidate or None if none can connect
+        """
         defer_candidates = None
 
         def connectionCb(candidate, profile):
@@ -947,7 +931,7 @@
 
     def _timeOut(self, sid, client):
         """Delecte current_stream id, called after timeout
-        @param id: id of client.xep_0065_current_stream"""
+        @param id: id of client.xep_0065_sid_session"""
         log.info(_("Socks5 Bytestream: TimeOut reached for id {sid} [{profile}]").format(
             sid=sid, profile=client.profile))
         self._killSession(sid, client, u"TIMEOUT")
@@ -961,7 +945,7 @@
             else, will be used to call failure_cb
         """
         try:
-            session = client.xep_0065_current_stream[sid]
+            session = client.xep_0065_sid_session[sid]
         except KeyError:
             log.warning(_("kill id called on a non existant id"))
             return
@@ -976,7 +960,7 @@
         if session['timer'].active():
             session['timer'].cancel()
 
-        del client.xep_0065_current_stream[sid]
+        del client.xep_0065_sid_session[sid]
 
         # FIXME: to check
         try:
@@ -1004,105 +988,73 @@
         @param successCb: method to call when stream successfuly finished
         @param failureCb: method to call when something goes wrong
         @param profile: %(doc_profile)s
+        @return (D): Deferred fired when session is finished
         """
         client = self.host.getClient(profile)
-        session_data = self._createSession(file_obj, to_jid, sid, client.profile)
-
-        session_data["to"] = to_jid
-        session_data["xmlstream"] = client.xmlstream
-        hash_ = session_data["hash"] = getSessionHash(client.jid, to_jid, sid)
+        session_data = self._createSession(file_obj, to_jid, sid, True, client.profile)
 
-        self.hash_profiles_map[hash_] = (sid, profile)
-
-        iq_elt = jabber_client.IQ(client.xmlstream, 'set')
-        iq_elt["from"] = client.jid.full()
-        iq_elt["to"] = to_jid.full()
-        query_elt = iq_elt.addElement('query', NS_BS)
-        query_elt['mode'] = 'tcp'
-        query_elt['sid'] = sid
+        session_data[client] = client
 
-        #first streamhost: direct connection
-        streamhost = query_elt.addElement('streamhost')
-        streamhost['host'] = self.host.memory.getParamA("IP", "File Transfer")
-        streamhost['port'] = self.host.memory.getParamA("Port", "File Transfer")
-        streamhost['jid'] = client.jid.full()
+        def gotCandidates(candidates):
+            session_data['candidates'] = candidates
+            iq_elt = client.IQ()
+            iq_elt["from"] = client.jid.full()
+            iq_elt["to"] = to_jid.full()
+            query_elt = iq_elt.addElement((NS_BS, 'query'))
+            query_elt['mode'] = 'tcp'
+            query_elt['sid'] = sid
 
-        #second streamhost: mediated connection, using proxy
-        streamhost = query_elt.addElement('streamhost')
-        streamhost['host'] = self.host.memory.getParamA("Proxy host", "File Transfer", profile_key=profile)
-        streamhost['port'] = self.host.memory.getParamA("Proxy port", "File Transfer", profile_key=profile)
-        streamhost['jid'] = self.host.memory.getParamA("Proxy", "File Transfer", profile_key=profile)
+            for candidate in candidates:
+                streamhost = query_elt.addElement('streamhost')
+                streamhost['host'] = candidate.host
+                streamhost['port'] = str(candidate.port)
+                streamhost['jid'] = candidate.jid.full()
 
-        iq_elt.addCallback(self._IQOpen, session_data, client)
-        iq_elt.send()
+            d = iq_elt.send()
+            args = [session_data, client]
+            d.addCallbacks(self._IQNegotiationCb, self._IQNegotiationEb, args, None, args)
+
+        self.getCandidates(profile).addCallback(gotCandidates)
         return session_data[DEFER_KEY]
 
-    def _IQOpen(self, session_data, client, iq_elt):
+    def _IQNegotiationCb(self, iq_elt, session_data, client):
         """Called when the result of open iq is received
 
         @param session_data(dict): data of the session
         @param client: %(doc_client)s
         @param iq_elt(domish.Element): <iq> result
         """
-        sid = session_data['id']
-        if iq_elt["type"] == "error":
-            log.warning(_("Socks5 transfer failed"))
-            # FIXME: must clean session
-            return
-
         try:
-            session_data = client.xep_0065_current_stream[sid]
-            file_obj = session_data["file_obj"]
-            timer = session_data["timer"]
-        except KeyError:
-            raise exceptions.InternalError
-
-        timer.reset(TIMEOUT)
-
-        query_elt = iq_elt.elements(NS_BS, 'query').next()
-        streamhost_elts = list(query_elt.elements(NS_BS, 'streamhost-used'))
-
-        if not streamhost_elts:
-            log.warning(_("No streamhost found in stream query"))
+            query_elt = iq_elt.elements(NS_BS, 'query').next()
+            streamhost_used_elt = query_elt.elements(NS_BS, 'streamhost-used').next()
+        except StopIteration:
+            log.warning(u"No streamhost found in stream query")
             # FIXME: must clean session
             return
 
-        # FIXME: must be cleaned !
-
-        streamhost_jid = streamhost_elts[0]['jid']
-        if streamhost_jid != client.jid.full():
-            log.debug(_("A proxy server is used"))
-            proxy_host = self.host.memory.getParamA("Proxy host", "File Transfer", profile_key=client.profile)
-            proxy_port = self.host.memory.getParamA("Proxy port", "File Transfer", profile_key=client.profile)
-            proxy_jid = self.host.memory.getParamA("Proxy", "File Transfer", profile_key=client.profile)
-            if proxy_jid != streamhost_jid:
-                log.warning(_("Proxy jid is not the same as in parameters, this should not happen"))
-                return
-            factory = Socks5ClientFactory(client.xep_0065_current_stream, sid, None, self.activateProxyStream, lambda sid, success, profile: self._killSession(sid, client), True, client.profile)
-            reactor.connectTCP(proxy_host, int(proxy_port), factory)
-        else:
-            session_data["start_transfer_cb"](file_obj)  # We now activate the stream
+        streamhost_jid = jid.JID(streamhost_used_elt['jid'])
+        try:
+            candidate = (c for c in session_data['candidates'] if c.jid == streamhost_jid).next()
+        except StopIteration:
+            log.warning(u"Candidate [{jid}] is unknown !".format(jid=streamhost_jid.full()))
+            return
 
-    def activateProxyStream(self, sid, iq_id, start_transfer_cb, profile):
-        log.debug(_("activating stream"))
-        client = self.host.getClient(profile)
-        session_data = client.xep_0065_current_stream[sid]
+        if candidate.type == XEP_0065.TYPE_PROXY:
+            log.info(u"A Socks5 proxy is used")
+            d = self.connectCandidate(candidate, session_data['hash'], profile=client.profile)
+            d.addCallback(lambda dummy: candidate.activate(session_data['id'], session_data['peer_jid'], client))
+            d.addErrback(self._activationEb)
+        else:
+            d = defer.succeed(None)
 
-        iq_elt = client.IQ(client.xmlstream, 'set')
-        iq_elt["from"] = client.jid.full()
-        iq_elt["to"] = self.host.memory.getParamA("Proxy", "File Transfer", profile_key=profile)
-        query_elt = iq_elt.addElement('query', NS_BS)
-        query_elt['sid'] = sid
-        query_elt.addElement('activate', content=session_data['to'].full())
-        iq_elt.addCallback(self.proxyResult, sid, start_transfer_cb, session_data['file_obj'])
-        iq_elt.send()
+        d.addCallback(lambda dummy: candidate.startTransfer(session_data['hash']))
 
-    def proxyResult(self, sid, start_transfer_cb, file_obj, iq_elt):
-        if iq_elt['type'] == 'error':
-            log.warning(_("Can't activate the proxy stream"))
-            return
-        else:
-            start_transfer_cb(file_obj)
+    def _activationEb(self, failure):
+        log.warning(u"Proxy activation error: {}".format(failure.value))
+
+    def _IQNegotiationEb(self, stanza_err, session_data, client):
+        log.warning(u"Socks5 transfer failed: {}".format(stanza_err.condition))
+        # FIXME: must clean session
 
     def createSession(self, *args, **kwargs):
         """like [_createSession] but return the session deferred instead of the whole session
@@ -1111,26 +1063,34 @@
         """
         return self._createSession(*args, **kwargs)[DEFER_KEY]
 
-    def _createSession(self, file_obj, to_jid, sid, profile):
+    def _createSession(self, file_obj, to_jid, sid, requester=False, profile=C.PROF_KEY_NONE):
         """Called when a bytestream is imminent
 
         @param file_obj(file): File object where data will be written
         @param to_jid(jid.JId): jid of the other peer
         @param sid(unicode): session id
+        @param initiator(bool): if True, this session is create by initiator
         @param profile: %(doc_profile)s
         @return (dict): session data
         """
         client = self.host.getClient(profile)
-        if sid in client.xep_0065_current_stream:
+        if sid in client.xep_0065_sid_session:
             raise exceptions.ConflictError(u'A session with this id already exists !')
-        session_data = client.xep_0065_current_stream[sid] = \
+        if requester:
+            session_hash = getSessionHash(client.jid, to_jid, sid)
+            session_data = self._registerHash(session_hash, file_obj, profile)
+        else:
+            session_hash = getSessionHash(to_jid, client.jid, sid)
+            session_data = client._s5b_sessions[session_hash] = {
+                DEFER_KEY: defer.Deferred(),
+                }
+        client.xep_0065_sid_session[sid] = session_data
+        session_data.update(
             {'id': sid,
-             DEFER_KEY: defer.Deferred(),
-             'to': to_jid,
-             'file_obj': file_obj,
-             'seq': -1, # FIXME: to check
-             'timer': reactor.callLater(TIMEOUT, self._timeOut, sid, client),
-            }
+             'peer_jid': to_jid,
+             'file': file_obj,
+             'hash': session_hash,
+            })
 
         return session_data
 
@@ -1156,7 +1116,7 @@
         return client._s5b_sessions[session_hash]
 
     def registerHash(self, *args, **kwargs):
-        """like [_registerHash] but resutrn the session deferred instead of the whole session
+        """like [_registerHash] but resturn the session deferred instead of the whole session
         session deferred is fired when transfer is finished
         """
         return self._registerHash(*args, **kwargs)[DEFER_KEY]
@@ -1195,91 +1155,62 @@
         return session_data
 
     def streamQuery(self, iq_elt, profile):
-        """Get file using byte stream"""
-        log.debug(_("BS stream query"))
+        log.debug(u"BS stream query")
         client = self.host.getClient(profile)
 
-        if not client:
-            raise ProfileNotInCacheError
-
-        xmlstream = client.xmlstream
-
         iq_elt.handled = True
-        query_elt = iq_elt.firstChildElement()
-        sid = query_elt.getAttribute("sid")
-        streamhost_elts = filter(lambda elt: elt.name == 'streamhost', query_elt.elements())
 
-        if not sid in client.xep_0065_current_stream:
-            log.warning(_(u"Ignoring unexpected BS transfer: %s" % sid))
-            self.sendNotAcceptableError(iq_elt['id'], iq_elt['from'], xmlstream)
-            return
+        query_elt = iq_elt.elements(NS_BS, 'query').next()
+        try:
+            sid = query_elt['sid']
+        except KeyError:
+            log.warning(u"Invalid bystreams request received")
+            return client.sendError(iq_elt, "bad-request")
 
-        client.xep_0065_current_stream[sid]['timer'].cancel()
-        client.xep_0065_current_stream[sid]["to"] = jid.JID(iq_elt["to"])
-        client.xep_0065_current_stream[sid]["xmlstream"] = xmlstream
-
+        streamhost_elts = list(query_elt.elements(NS_BS, 'streamhost'))
         if not streamhost_elts:
-            log.warning(_(u"No streamhost found in stream query %s" % sid))
-            self.sendBadRequestError(iq_elt['id'], iq_elt['from'], xmlstream)
-            return
+            return client.sendError(iq_elt, "bad-request")
 
-        streamhost_elt = streamhost_elts[0]  # TODO: manage several streamhost elements case
-        sh_host = streamhost_elt.getAttribute("host")
-        sh_port = streamhost_elt.getAttribute("port")
-        sh_jid = streamhost_elt.getAttribute("jid")
-        if not sh_host or not sh_port or not sh_jid:
-            log.warning(_("incomplete streamhost element"))
-            self.sendBadRequestError(iq_elt['id'], iq_elt['from'], xmlstream)
-            return
+        try:
+            session_data = client.xep_0065_sid_session[sid]
+        except KeyError:
+            log.warning(u"Ignoring unexpected BS transfer: {}".format(sid))
+            return client.sendError(iq_elt, 'not-acceptable')
 
-        client.xep_0065_current_stream[sid]["streamhost"] = (sh_host, sh_port, sh_jid)
-
-        log.info(_("Stream proposed: host=[%(host)s] port=[%(port)s]") % {'host': sh_host, 'port': sh_port})
-        factory = Socks5ClientFactory(client.xep_0065_current_stream, sid, iq_elt["id"], self.activateStream, lambda sid, success, profile: self._killSession(sid, client), profile=profile)
-        reactor.connectTCP(sh_host, int(sh_port), factory)
+        peer_jid = session_data["peer_jid"] = jid.JID(iq_elt["from"])
 
-    def activateStream(self, sid, iq_id, profile):
-        client = self.host.getClient(profile)
-        log.debug(_("activating stream"))
-        result = domish.Element((None, 'iq'))
-        session_data = client.xep_0065_current_stream[sid]
-        result['type'] = 'result'
-        result['id'] = iq_id
-        result['from'] = session_data["to"].full()
-        result['to'] = session_data["from"].full()
-        query = result.addElement('query', NS_BS)
-        query['sid'] = sid
-        streamhost = query.addElement('streamhost-used')
-        streamhost['jid'] = session_data["streamhost"][2]
-        session_data["xmlstream"].send(result)
+        candidates = []
+        nb_sh = len(streamhost_elts)
+        for idx, sh_elt in enumerate(streamhost_elts):
+            try:
+                host, port, jid_ = sh_elt['host'], sh_elt['port'], jid.JID(sh_elt['jid'])
+            except KeyError:
+                log.warning(u"malformed streamhost element")
+                return client.sendError(iq_elt, "bad-request")
+            priority = nb_sh - idx
+            if jid_.userhostJID() != peer_jid.userhostJID():
+                type_ = XEP_0065.TYPE_PROXY
+            else:
+                type_ = XEP_0065.TYPE_DIRECT
+            candidates.append(Candidate(host, port, type_, priority, jid_))
 
-    def sendNotAcceptableError(self, iq_id, to_jid, xmlstream):
-        """Not acceptable error used when the stream is not expected or something is going wrong
-        @param iq_id: IQ id
-        @param to_jid: addressee
-        @param xmlstream: XML stream to use to send the error"""
-        result = domish.Element((None, 'iq'))
-        result['type'] = 'result'
-        result['id'] = iq_id
-        result['to'] = to_jid
-        error_el = result.addElement('error')
-        error_el['type'] = 'modify'
-        error_el.addElement(('urn:ietf:params:xml:ns:xmpp-stanzas', 'not-acceptable'))
-        xmlstream.send(result)
+        for candidate in candidates:
+            log.info(u"Candidate proposed: {}".format(candidate))
+
+        d = self.getBestCandidate(candidates, session_data['hash'], profile)
+        d.addCallback(self._ackStream, iq_elt, session_data, client)
 
-    def sendBadRequestError(self, iq_id, to_jid, xmlstream):
-        """Not acceptable error used when the stream is not expected or something is going wrong
-        @param iq_id: IQ id
-        @param to_jid: addressee
-        @param xmlstream: XML stream to use to send the error"""
-        result = domish.Element((None, 'iq'))
-        result['type'] = 'result'
-        result['id'] = iq_id
-        result['to'] = to_jid
-        error_el = result.addElement('error')
-        error_el['type'] = 'cancel'
-        error_el.addElement(('urn:ietf:params:xml:ns:xmpp-stanzas', 'bad-request'))
-        xmlstream.send(result)
+    def _ackStream(self, candidate, iq_elt, session_data, client):
+        if candidate is None:
+            log.info("No streamhost candidate worked, we have to end negotiation")
+            return client.sendError(iq_elt, 'item-not-found')
+        log.debug(u"activating stream")
+        result_elt = xmlstream.toResponse(iq_elt, 'result')
+        query_elt = result_elt.addElement((NS_BS, 'query'))
+        query_elt['sid'] = session_data['id']
+        streamhost_used_elt = query_elt.addElement('streamhost-used')
+        streamhost_used_elt['jid'] = candidate.jid.full()
+        client.xmlstream.send(result_elt)
 
 
 class XEP_0065_handler(XMPPHandler):
--- a/src/plugins/plugin_xep_0095.py	Wed Nov 11 18:19:49 2015 +0100
+++ b/src/plugins/plugin_xep_0095.py	Wed Nov 11 18:19:49 2015 +0100
@@ -21,23 +21,14 @@
 from sat.core.constants import Const as C
 from sat.core.log import getLogger
 log = getLogger(__name__)
-from twisted.words.xish import domish
-from twisted.words.protocols.jabber import client
+from sat.core import exceptions
+from twisted.words.protocols.jabber import xmlstream
+from twisted.words.protocols.jabber import error
+from zope.interface import implements
+from wokkel import disco
+from wokkel import iwokkel
 import uuid
 
-from zope.interface import implements
-
-try:
-    from twisted.words.protocols.xmlstream import XMPPHandler
-except ImportError:
-    from wokkel.subprotocols import XMPPHandler
-
-from wokkel import disco, iwokkel
-
-IQ_SET = '/iq[@type="set"]'
-NS_SI = 'http://jabber.org/protocol/si'
-SI_REQUEST = IQ_SET + '/si[@xmlns="' + NS_SI + '"]'
-SI_PROFILE_HEADER = "http://jabber.org/protocol/si/profile/"
 
 PLUGIN_INFO = {
     "name": "XEP 0095 Plugin",
@@ -50,6 +41,13 @@
 }
 
 
+IQ_SET = '/iq[@type="set"]'
+NS_SI = 'http://jabber.org/protocol/si'
+SI_REQUEST = IQ_SET + '/si[@xmlns="' + NS_SI + '"]'
+SI_PROFILE_HEADER = "http://jabber.org/protocol/si/profile/"
+SI_ERROR_CONDITIONS = ('bad-profile', 'no-valid-streams')
+
+
 class XEP_0095(object):
 
     def __init__(self, host):
@@ -62,129 +60,105 @@
 
     def registerSIProfile(self, si_profile, callback):
         """Add a callback for a SI Profile
-        param si_profile: SI profile name (e.g. file-transfer)
-        param callback: method to call when the profile name is asked"""
+
+        @param si_profile(unicode): SI profile name (e.g. file-transfer)
+        @param callback(callable): method to call when the profile name is asked
+        """
         self.si_profiles[si_profile] = callback
 
-    def streamInit(self, iq_el, profile):
+    def unregisterSIProfile(self, si_profile):
+        try:
+            del self.si_profiles[si_profile]
+        except KeyError:
+            log.error(u"Trying to unregister SI profile [{}] which was not registered".format(si_profile))
+
+    def streamInit(self, iq_elt, profile):
         """This method is called on stream initiation (XEP-0095 #3.2)
-        @param iq_el: IQ element
+
+        @param iq_elt: IQ element
         @param profile: %(doc_profile)s"""
         log.info(_("XEP-0095 Stream initiation"))
-        iq_el.handled = True
-        si_el = iq_el.firstChildElement()
-        si_id = si_el.getAttribute('id')
-        si_mime_type = iq_el.getAttribute('mime-type', 'application/octet-stream')
-        si_profile = si_el.getAttribute('profile')
+        iq_elt.handled = True
+        si_elt = iq_elt.elements(NS_SI, 'si').next()
+        si_id = si_elt['id']
+        si_mime_type = iq_elt.getAttribute('mime-type', 'application/octet-stream')
+        si_profile = si_elt['profile']
         si_profile_key = si_profile[len(SI_PROFILE_HEADER):] if si_profile.startswith(SI_PROFILE_HEADER) else si_profile
         if si_profile_key in self.si_profiles:
             #We know this SI profile, we call the callback
-            self.si_profiles[si_profile_key](iq_el['id'], iq_el['from'], si_id, si_mime_type, si_el, profile)
+            self.si_profiles[si_profile_key](iq_elt, si_id, si_mime_type, si_elt, profile)
         else:
             #We don't know this profile, we send an error
-            self.sendBadProfileError(iq_el['id'], iq_el['from'], profile)
+            self.sendError(iq_elt, 'bad-profile', profile)
 
-    def sendRejectedError(self, iq_id, to_jid, reason='Offer Declined', profile=C.PROF_KEY_NONE):
-        """Helper method to send when the stream is rejected
-        @param iq_id: IQ id
-        @param to_jid: recipient
-        @param reason: human readable reason (string)
-        @param profile: %(doc_profile)s"""
-        self.sendError(iq_id, to_jid, 403, 'cancel', {'text': reason}, profile=profile)
-
-    def sendBadProfileError(self, iq_id, to_jid, profile):
-        """Helper method to send when we don't know the SI profile
-        @param iq_id: IQ id
-        @param to_jid: recipient
-        @param profile: %(doc_profile)s"""
-        self.sendError(iq_id, to_jid, 400, 'modify', profile=profile)
+    def sendError(self, request, condition, profile):
+        """Send IQ error as a result
 
-    def sendBadRequestError(self, iq_id, to_jid, profile):
-        """Helper method to send when we don't know the SI profile
-        @param iq_id: IQ id
-        @param to_jid: recipient
-        @param profile: %(doc_profile)s"""
-        self.sendError(iq_id, to_jid, 400, 'cancel', profile=profile)
+        @param request(domish.Element): original IQ request
+        @param condition(str): error condition
+        @param profile: %(doc_profile)s
+        """
+        client = self.host.getClient(profile)
+        if condition in SI_ERROR_CONDITIONS:
+            si_condition = condition
+            condition = 'bad-request'
+        else:
+            si_condition = None
 
-    def sendFailedError(self, iq_id, to_jid, profile):
-        """Helper method to send when we transfer failed
-        @param iq_id: IQ id
-        @param to_jid: recipient
-        @param profile: %(doc_profile)s"""
-        self.sendError(iq_id, to_jid, 500, 'cancel', {'custom': 'failed'}, profile=profile)  # as there is no lerror code for failed transfer, we use 500 (undefined-condition)
+        iq_error_elt = error.StanzaError(condition).toResponse(request)
+        if si_condition is not None:
+            iq_error_elt.error.addElement((NS_SI, si_condition))
+
+        client.xmlstream.send(iq_error_elt)
 
-    def sendError(self, iq_id, to_jid, err_code, err_type='cancel', data={}, profile=C.PROF_KEY_NONE):
-        """Send IQ error as a result
-        @param iq_id: IQ id
-        @param to_jid: recipient
-        @param err_code: error err_code (see XEP-0095 #4.2)
-        @param err_type: one of cancel, modify
-        @param data: error specific data (dictionary)
+    def acceptStream(self, iq_elt, feature_elt, misc_elts=None, profile=C.PROF_KEY_NONE):
+        """Send the accept stream initiation answer
+
+        @param iq_elt(domish.Element): initial SI request
+        @param feature_elt(domish.Element): 'feature' element containing stream method to use
+        @param misc_elts(list[domish.Element]): list of elements to add
         @param profile: %(doc_profile)s
         """
-        client_ = self.host.getClient(profile)
-        result = domish.Element((None, 'iq'))
-        result['type'] = 'result'
-        result['id'] = iq_id
-        result['to'] = to_jid
-        error_el = result.addElement('error')
-        error_el['err_code'] = str(err_code)
-        error_el['type'] = err_type
-        if err_code == 400 and err_type == 'cancel':
-            error_el.addElement(('urn:ietf:params:xml:ns:xmpp-stanzas', 'bad-request'))
-            error_el.addElement((NS_SI, 'no-valid-streams'))
-        elif err_code == 400 and err_type == 'modify':
-            error_el.addElement(('urn:ietf:params:xml:ns:xmpp-stanzas', 'bad-request'))
-            error_el.addElement((NS_SI, 'bad-profile'))
-        elif err_code == 403 and err_type == 'cancel':
-            error_el.addElement(('urn:ietf:params:xml:ns:xmpp-stanzas', 'forbidden'))
-            if 'text' in data:
-                error_el.addElement(('urn:ietf:params:xml:ns:xmpp-stanzas', 'text'), content=data['text'])
-        elif err_code == 500 and err_type == 'cancel':
-            condition_el = error_el.addElement((NS_SI, 'undefined-condition'))
-            if 'custom' in data and data['custom'] == 'failed':
-                condition_el.addContent('Stream failed')
-
-        client_.xmlstream.send(result)
+        log.info(_("sending stream initiation accept answer"))
+        if misc_elts is None:
+            misc_elts = []
+        client = self.host.getClient(profile)
+        result_elt = xmlstream.toResponse(iq_elt, 'result')
+        si_elt = result_elt.addElement((NS_SI, 'si'))
+        si_elt.addChild(feature_elt)
+        for elt in misc_elts:
+            si_elt.addChild(elt)
+        client.xmlstream.send(result_elt)
 
-    def acceptStream(self, iq_id, to_jid, feature_elt, misc_elts=[], profile=C.PROF_KEY_NONE):
-        """Send the accept stream initiation answer
-        @param iq_id: IQ id
-        @param feature_elt: domish element 'feature' containing stream method to use
-        @param misc_elts: list of domish element to add
-        @param profile: %(doc_profile)s"""
-        _client = self.host.getClient(profile)
-        assert(_client)
-        log.info(_("sending stream initiation accept answer"))
-        result = domish.Element((None, 'iq'))
-        result['type'] = 'result'
-        result['id'] = iq_id
-        result['to'] = to_jid
-        si = result.addElement('si', NS_SI)
-        si.addChild(feature_elt)
-        for elt in misc_elts:
-            si.addChild(elt)
-        _client.xmlstream.send(result)
+    def _parseOfferResult(self, iq_elt):
+        try:
+            si_elt = iq_elt.elements(NS_SI, "si").next()
+        except StopIteration:
+            log.warning(u"No <si/> element found in result while expected")
+            raise exceptions.DataError
+        return (iq_elt, si_elt)
+
+
+    def proposeStream(self, to_jid, si_profile, feature_elt, misc_elts, mime_type='application/octet-stream', profile=C.PROF_KEY_NONE):
+        """Propose a stream initiation
 
-    def proposeStream(self, to_jid, si_profile, feature_elt, misc_elts, mime_type='application/octet-stream', profile_key=C.PROF_KEY_NONE):
-        """Propose a stream initiation
-        @param to_jid: recipient (JID)
-        @param si_profile: Stream initiation profile (XEP-0095)
-        @param feature_elt: feature domish element, according to XEP-0020
-        @param misc_elts: list of domish element to add for this profile
-        @param mime_type: stream mime type
+        @param to_jid(jid.JID): recipient
+        @param si_profile(unicode): Stream initiation profile (XEP-0095)
+        @param feature_elt(domish.Element): feature element, according to XEP-0020
+        @param misc_elts(list[domish.Element]): list of elements to add
+        @param mime_type(unicode): stream mime type
         @param profile: %(doc_profile)s
-        @return: session id, offer"""
-        current_jid, xmlstream = self.host.getJidNStream(profile_key)
-        if not xmlstream:
-            log.error(_('Asking for an non-existant or not connected profile'))
-            return ""
-
-        offer = client.IQ(xmlstream, 'set')
+        @return (tuple): tuple with:
+            - session id (unicode)
+            - (D(domish_elt, domish_elt): offer deferred which returl a tuple
+                with iq_elt and si_elt
+        """
+        client = self.host.getClient(profile)
+        offer = client.IQ()
         sid = str(uuid.uuid4())
         log.debug(_(u"Stream Session ID: %s") % offer["id"])
 
-        offer["from"] = current_jid.full()
+        offer["from"] = client.jid.full()
         offer["to"] = to_jid.full()
         si = offer.addElement('si', NS_SI)
         si['id'] = sid
@@ -194,11 +168,12 @@
             si.addChild(elt)
         si.addChild(feature_elt)
 
-        offer.send()
-        return sid, offer
+        offer_d = offer.send()
+        offer_d.addCallback(self._parseOfferResult)
+        return sid, offer_d
 
 
-class XEP_0095_handler(XMPPHandler):
+class XEP_0095_handler(xmlstream.XMPPHandler):
     implements(iwokkel.IDisco)
 
     def __init__(self, plugin_parent):
@@ -209,7 +184,7 @@
         self.xmlstream.addObserver(SI_REQUEST, self.plugin_parent.streamInit, profile=self.parent.profile)
 
     def getDiscoInfo(self, requestor, target, nodeIdentifier=''):
-        return [disco.DiscoFeature(NS_SI)] + [disco.DiscoFeature("http://jabber.org/protocol/si/profile/%s" % profile_name) for profile_name in self.plugin_parent.si_profiles]
+        return [disco.DiscoFeature(NS_SI)] + [disco.DiscoFeature(u"http://jabber.org/protocol/si/profile/{}".format(profile_name)) for profile_name in self.plugin_parent.si_profiles]
 
     def getDiscoItems(self, requestor, target, nodeIdentifier=''):
         return []
--- a/src/plugins/plugin_xep_0096.py	Wed Nov 11 18:19:49 2015 +0100
+++ b/src/plugins/plugin_xep_0096.py	Wed Nov 11 18:19:49 2015 +0100
@@ -21,18 +21,17 @@
 from sat.core.constants import Const as C
 from sat.core.log import getLogger
 log = getLogger(__name__)
+from sat.core import exceptions
 from twisted.words.xish import domish
 from twisted.words.protocols.jabber import jid
-from twisted.words.protocols import jabber
+from twisted.words.protocols.jabber import error
 import os
-from twisted.internet import reactor
-from twisted.python import failure
+
 
-from wokkel import data_form
-
+NS_SI_FT = "http://jabber.org/protocol/si/profile/file-transfer"
 IQ_SET = '/iq[@type="set"]'
-PROFILE_NAME = "file-transfer"
-PROFILE = "http://jabber.org/protocol/si/profile/" + PROFILE_NAME
+SI_PROFILE_NAME = "file-transfer"
+SI_PROFILE = "http://jabber.org/protocol/si/profile/" + SI_PROFILE_NAME
 
 PLUGIN_INFO = {
     "name": "XEP-0096 Plugin",
@@ -53,84 +52,125 @@
         self.host = host
         self.managed_stream_m = [self.host.plugins["XEP-0065"].NAMESPACE,
                                  self.host.plugins["XEP-0047"].NAMESPACE]  # Stream methods managed
-        self.host.plugins["XEP-0095"].registerSIProfile(PROFILE_NAME, self.transferRequest)
-        host.bridge.addMethod("sendFile", ".plugin", in_sign='ssa{ss}s', out_sign='s', method=self.sendFile)
+        self._f = self.host.plugins["FILE"]
+        self._si = self.host.plugins["XEP-0095"]
+        self._si.registerSIProfile(SI_PROFILE_NAME, self._transferRequest)
+        host.bridge.addMethod("siSendFile", ".plugin", in_sign='sssss', out_sign='s', method=self._sendFile)
+
+    def unload(self):
+        self._si.unregisterSIProfile(SI_PROFILE_NAME)
+
+    def _badRequest(self, iq_elt, message=None, profile=C.PROF_KEY_NONE):
+        """Send a bad-request error
 
-    def profileConnected(self, profile):
-        client = self.host.getClient(profile)
-        client._xep_0096_waiting_for_approval = {}  # key = id, value = [transfer data, IdelayedCall Reactor timeout,
-                                        # current stream method, [failed stream methods], profile]
+        @param iq_elt(domish.Element): initial <IQ> element of the SI request
+        @param message(None, unicode): informational message to display in the logs
+        @param profile: %(doc_profile)s
+        """
+        if message is not None:
+            log.warning(message)
+        self._si.sendError(iq_elt, 'bad-request', profile)
 
-    def _kill_id(self, approval_id, profile):
-        """Delete a waiting_for_approval id, called after timeout
-        @param approval_id: id of _xep_0096_waiting_for_approval"""
-        log.info(_("SI File Transfer: TimeOut reached for id %s") % approval_id)
+    def _parseRange(self, parent_elt, file_size):
+        """find and parse <range/> element
+
+        @param parent_elt(domish.Element): direct parent of the <range/> element
+        @return (tuple[bool, int, int]): a tuple with
+            - True if range is required
+            - range_offset
+            - range_length
+        """
         try:
-            client = self.host.getClient(profile)
-            del client._xep_0096_waiting_for_approval[approval_id]
-        except KeyError:
-            log.warning(_("kill id called on a non existant approval id"))
+            range_elt = parent_elt.elements(NS_SI_FT, 'range').next()
+        except StopIteration:
+            range_ = False
+            range_offset = None
+            range_length = None
+        else:
+            range_ = True
 
-    def transferRequest(self, iq_id, from_jid, si_id, si_mime_type, si_el, profile):
+            try:
+                range_offset = int(range_elt['offset'])
+            except KeyError:
+                range_offset = 0
+
+            try:
+                range_length = int(range_elt['length'])
+            except KeyError:
+                range_length = file_size
+
+            if range_offset != 0 or range_length != file_size:
+                raise NotImplementedError # FIXME
+
+        return range_, range_offset, range_length
+
+    def _transferRequest(self, iq_elt, si_id, si_mime_type, si_elt, profile):
         """Called when a file transfer is requested
-        @param iq_id: id of the iq request
-        @param from_jid: jid of the sender
-        @param si_id: Stream Initiation session id
-        @param si_mime_type: Mime type of the file (or default "application/octet-stream" if unknown)
-        @param si_el: domish.Element of the request
-        @param profile: %(doc_profile)s"""
+
+        @param iq_elt(domish.Element): initial <IQ> element of the SI request
+        @param si_id(unicode): Stream Initiation session id
+        @param si_mime_type("unicode"): Mime type of the file (or default "application/octet-stream" if unknown)
+        @param si_elt(domish.Element): request
+        @param profile: %(doc_profile)s
+        """
         log.info(_("XEP-0096 file transfer requested"))
-        log.debug(si_el.toXml())
-        client = self.host.getClient(profile)
-        filename = ""
-        file_size = ""
-        file_date = None
-        file_hash = None
-        file_desc = ""
-        can_range = False
-        file_elts = filter(lambda elt: elt.name == 'file', si_el.elements())
-        feature_elts = self.host.plugins["XEP-0020"].getFeatureElt(si_el)
+        peer_jid = jid.JID(iq_elt['from'])
+
+        try:
+            file_elt = si_elt.elements(NS_SI_FT, "file").next()
+        except StopIteration:
+            return self._badRequest(iq_elt, "No <file/> element found in SI File Transfer request", profile)
+
+        try:
+            feature_elt = self.host.plugins["XEP-0020"].getFeatureElt(si_elt)
+        except exceptions.NotFound:
+            return self._badRequest(iq_elt, "No <feature/> element found in SI File Transfer request", profile)
+
+        try:
+            filename = file_elt["name"]
+            file_size = int(file_elt["size"])
+        except (KeyError, ValueError):
+            return self._badRequest(iq_elt, "Malformed SI File Transfer request", profile)
+
+        file_date = file_elt.getAttribute("date")
+        file_hash = file_elt.getAttribute("hash")
+
+        log.info(u"File proposed: name=[{name}] size={size}".format(name=filename, size=file_size))
 
-        if file_elts:
-            file_el = file_elts[0]
-            filename = file_el["name"]
-            file_size = file_el["size"]
-            file_date = file_el.getAttribute("date", "")
-            file_hash = file_el.getAttribute("hash", "")
-            log.info(_(u"File proposed: name=[%(name)s] size=%(size)s") % {'name': filename, 'size': file_size})
-            for file_child_el in file_el.elements():
-                if file_child_el.name == "desc":
-                    file_desc = unicode(file_child_el)
-                elif file_child_el.name == "range":
-                    can_range = True
-        else:
-            log.warning(_("No file element found"))
-            self.host.plugins["XEP-0095"].sendBadRequestError(iq_id, from_jid, profile)
-            return
+        try:
+            file_desc = unicode(file_elt.elements(NS_SI_FT, 'desc').next())
+        except StopIteration:
+            file_desc = ''
+
+        try:
+            range_, range_offset, range_length = self._parseRange(file_elt, file_size)
+        except ValueError:
+            return self._badRequest(iq_elt, "Malformed SI File Transfer request", profile)
 
-        if feature_elts:
-            feature_el = feature_elts[0]
-            data_form.Form.fromElement(feature_el.firstChildElement())
-            try:
-                stream_method = self.host.plugins["XEP-0020"].negociate(feature_el, 'stream-method', self.managed_stream_m)
-            except KeyError:
-                log.warning(_("No stream method found"))
-                self.host.plugins["XEP-0095"].sendBadRequestError(iq_id, from_jid, profile)
-                return
-            if not stream_method:
-                log.warning(_("Can't find a valid stream method"))
-                self.host.plugins["XEP-0095"].sendFailedError(iq_id, from_jid, profile)
-                return
+        try:
+            stream_method = self.host.plugins["XEP-0020"].negotiate(feature_elt, 'stream-method', self.managed_stream_m, namespace=None)
+        except KeyError:
+            return self._badRequest(iq_elt, "No stream method found", profile)
+
+        if stream_method:
+            if stream_method == self.host.plugins["XEP-0065"].NAMESPACE:
+                plugin = self.host.plugins["XEP-0065"]
+            elif stream_method == self.host.plugins["XEP-0047"].NAMESPACE:
+                plugin = self.host.plugins["XEP-0047"]
+            else:
+                log.error(_("Unknown stream method, this should not happen at this stage, cancelling transfer"))
         else:
-            log.warning(_("No feature element found"))
-            self.host.plugins["XEP-0095"].sendBadRequestError(iq_id, from_jid, profile)
+            log.warning(_("Can't find a valid stream method"))
+            self._si.sendError(iq_elt, 'not-acceptable', profile)
             return
 
         #if we are here, the transfer can start, we just need user's agreement
-        data = {"filename": filename, "id": iq_id, "from": from_jid, "size": file_size, "date": file_date, "hash": file_hash, "desc": file_desc, "can_range": str(can_range)}
-        client._xep_0096_waiting_for_approval[si_id] = [data, reactor.callLater(300, self._kill_id, si_id, profile), stream_method, []]
+        data = {"name": filename, "peer_jid": peer_jid, "size": file_size, "date": file_date, "hash": file_hash, "desc": file_desc,
+                "range": range_, "range_offset": range_offset, "range_length": range_length,
+                "si_id": si_id, "stream_method": stream_method, "stream_plugin": plugin}
 
-        self.host.askConfirmation(si_id, "FILE_TRANSFER", data, self.confirmationCB, profile)
+        d = self._f.getDestDir(peer_jid, data, data, profile)
+        d.addCallback(self.confirmationCb, iq_elt, data, profile)
 
     def _getFileObject(self, dest_path, can_range=False):
         """Open file, put file pointer to the end if the file if needed
@@ -139,178 +179,176 @@
         @return: File Object"""
         return open(dest_path, "ab" if can_range else "wb")
 
-    def confirmationCB(self, sid, accepted, frontend_data, profile):
+    def confirmationCb(self, accepted, iq_elt, data, profile):
         """Called on confirmation answer
-        @param sid: file transfer session id
-        @param accepted: True if file transfer is accepted
-        @param frontend_data: data sent by frontend"""
-        client = self.host.getClient(profile)
-        data, timeout, stream_method, failed_methods = client._xep_0096_waiting_for_approval[sid]
-        can_range = data['can_range'] == "True"
-        range_offset = 0
-        if accepted:
-            if timeout.active():
-                timeout.cancel()
-            try:
-                dest_path = frontend_data['dest_path']
-            except KeyError:
-                log.error(_('dest path not found in frontend_data'))
-                del client._xep_0096_waiting_for_approval[sid]
-                return
-            if stream_method == self.host.plugins["XEP-0065"].NAMESPACE:
-                plugin = self.host.plugins["XEP-0065"]
-            elif stream_method == self.host.plugins["XEP-0047"].NAMESPACE:
-                plugin = self.host.plugins["XEP-0047"]
-            else:
-                log.error(_("Unknown stream method, this should not happen at this stage, cancelling transfer"))
-                del client._xep_0096_waiting_for_approval[sid]
-                return
 
-            file_obj = self._getFileObject(dest_path, can_range)
-            range_offset = file_obj.tell()
-            d = plugin.createSession(file_obj, jid.JID(data['from']), sid, int(data["size"]), profile)
-            d.addCallback(self._transferSucceeded, sid, file_obj, stream_method, profile)
-            d.addErrback(self._transferFailed, sid, file_obj, stream_method, profile)
+        @param accepted(bool): True if file transfer is accepted
+        @param iq_elt(domish.Element): initial SI request
+        @param data(dict): session data
+        @param profile: %(doc_profile)s
+        """
+        if not accepted:
+            log.info(u"File transfer declined")
+            self._si.sendError(iq_elt, 'forbidden', profile)
+            return
+        # data, timeout, stream_method, failed_methods = client._xep_0096_waiting_for_approval[sid]
+        # can_range = data['can_range'] == "True"
+        # range_offset = 0
+        # if timeout.active():
+        #     timeout.cancel()
+        # try:
+        #     dest_path = frontend_data['dest_path']
+        # except KeyError:
+        #     log.error(_('dest path not found in frontend_data'))
+        #     del client._xep_0096_waiting_for_approval[sid]
+        #     return
+        # if stream_method == self.host.plugins["XEP-0065"].NAMESPACE:
+        #     plugin = self.host.plugins["XEP-0065"]
+        # elif stream_method == self.host.plugins["XEP-0047"].NAMESPACE:
+        #     plugin = self.host.plugins["XEP-0047"]
+        # else:
+        #     log.error(_("Unknown stream method, this should not happen at this stage, cancelling transfer"))
+        #     del client._xep_0096_waiting_for_approval[sid]
+        #     return
 
-            #we can send the iq result
-            feature_elt = self.host.plugins["XEP-0020"].chooseOption({'stream-method': stream_method})
-            misc_elts = []
-            misc_elts.append(domish.Element((PROFILE, "file")))
-            if can_range:
-                range_elt = domish.Element((None, "range"))
-                range_elt['offset'] = str(range_offset)
-                #TODO: manage range length
-                misc_elts.append(range_elt)
-            self.host.plugins["XEP-0095"].acceptStream(data["id"], data['from'], feature_elt, misc_elts, profile)
-        else:
-            log.debug(_(u"Transfer [%s] refused") % sid)
-            self.host.plugins["XEP-0095"].sendRejectedError(data["id"], data['from'], profile=profile)
-            del(client._xep_0096_waiting_for_approval[sid])
+        # file_obj = self._getFileObject(dest_path, can_range)
+        # range_offset = file_obj.tell()
+        d = data['stream_plugin'].createSession(data['file_obj'], data['peer_jid'], data['si_id'], profile=profile)
+        d.addCallback(self._transferCb, data, profile)
+        d.addErrback(self._transferEb, data, profile)
 
-    def _transferSucceeded(self, dummy, sid, file_obj, stream_method, profile):
-        self.transferSucceeded(sid, file_obj, stream_method, profile)
+        #we can send the iq result
+        feature_elt = self.host.plugins["XEP-0020"].chooseOption({'stream-method': data['stream_method']}, namespace=None)
+        misc_elts = []
+        misc_elts.append(domish.Element((SI_PROFILE, "file")))
+        # if can_range:
+        #     range_elt = domish.Element((None, "range"))
+        #     range_elt['offset'] = str(range_offset)
+        #     #TODO: manage range length
+        #     misc_elts.append(range_elt)
+        self._si.acceptStream(iq_elt, feature_elt, misc_elts, profile)
 
-    def transferSucceeded(self, dummy, sid, file_obj, stream_method, profile):
+    def _transferCb(self, dummy, data, profile):
         """Called by the stream method when transfer successfuly finished
-        @param id: stream id"""
-        client = self.host.getClient(profile)
-        file_obj.close()
-        log.info(_('Transfer %s successfuly finished') % sid)
-        del(client._xep_0096_waiting_for_approval[sid])
 
-    def _transferFailed(self, sid, file_obj, stream_method, reason, profile):
-        self.transferFailed(failure.Failure(Exception(reason)), sid, file_obj, stream_method, profile)
+        @param data: session data
+        @param profile: %(doc_profile)s
+        """
+        #TODO: check hash
+        data['file_obj'].close()
+        log.info(u'Transfer {si_id} successfuly finished'.format(**data))
 
-    def transferFailed(self, failure, sid, file_obj, stream_method, profile):
+    def _transferEb(self, failure, data, profile):
         """Called when something went wrong with the transfer
 
         @param id: stream id
+        @param data: session data
+        @param profile: %(doc_profile)s
+        """
+        log.warning(u'Transfer {si_id} failed: {reason}'.format(reason=unicode(failure.condition), **data))
+        data['file_obj'].close()
+
+    def _sendFile(self, peer_jid_s, filepath, name, desc, profile=C.PROF_KEY_NONE):
+        return self.sendFile(jid.JID(peer_jid_s), filepath, name or None, desc or None, profile)
+
+    def sendFile(self, peer_jid, filepath, name=None, desc=None, profile=C.PROF_KEY_NONE):
+        """Send a file using XEP-0096
+
+        @param peer_jid(jid.JID): recipient
+        @param filepath(str): absolute path to the file to send
+        @param name(unicode): name of the file to send
+            name must not contain "/" characters
+        @param desc: description of the file
+        @param profile: %(doc_profile)s
+        @return: an unique id to identify the transfer
         """
         client = self.host.getClient(profile)
-        data, timeout, stream_method, failed_methods = client._xep_0096_waiting_for_approval[sid]
-        log.warning(_(u'Transfer %(id)s failed with stream method %(s_method)s: %(reason)s') % {
-            'id': sid,
-            's_method': stream_method,
-            'reason': unicode(failure)})
-        filepath = file_obj.name
-        file_obj.close()
-        os.remove(filepath)
-        #TODO: session remenber (within a time limit) when a stream method fail, and avoid that stream method with full jid for the rest of the session
-        log.warning(_("All stream methods failed, can't transfer the file"))
-        del(client._xep_0096_waiting_for_approval[sid])
+        feature_elt = self.host.plugins["XEP-0020"].proposeFeatures({'stream-method': self.managed_stream_m}, namespace=None)
+
+        file_transfer_elts = []
+
+        statinfo = os.stat(filepath)
+        file_elt = domish.Element((SI_PROFILE, 'file'))
+        file_elt['name'] = name or os.path.basename(filepath)
+        assert '/' not in file_elt['name']
+        size = statinfo.st_size
+        file_elt['size'] = str(size)
+        if desc:
+            file_elt.addElement('desc', content=desc)
+        file_transfer_elts.append(file_elt)
 
-    def fileCb(self, filepath, sid, size, profile, IQ):
-        if IQ['type'] == "error":
-            stanza_err = jabber.error.exceptionFromStanza(IQ)
-            if stanza_err.code == '403' and stanza_err.condition == 'forbidden':
-                log.debug(_(u"File transfer refused by %s") % IQ['from'])
-                self.host.bridge.newAlert(_("The contact %s refused your file") % IQ['from'], _("File refused"), "INFO", profile)
-            else:
-                log.warning(_(u"Error during file transfer with %s") % IQ['from'])
-                self.host.bridge.newAlert(_("Something went wrong during the file transfer session intialisation with %s") % IQ['from'], _("File transfer error"), "ERROR", profile)
+        file_transfer_elts.append(domish.Element((None, 'range')))
+
+        sid, offer_d = self._si.proposeStream(peer_jid, SI_PROFILE, feature_elt, file_transfer_elts, profile=client.profile)
+        args = [filepath, sid, size, client]
+        offer_d.addCallbacks(self._fileCb, self._fileEb, args, None, args)
+        return sid
+
+    def _fileCb(self, result_tuple, filepath, sid, size, client):
+        iq_elt, si_elt = result_tuple
+
+        try:
+            feature_elt = self.host.plugins["XEP-0020"].getFeatureElt(si_elt)
+        except exceptions.NotFound:
+            log.warning(u"No <feature/> element found in result while expected")
             return
 
-        si_elt = IQ.firstChildElement()
-
-        if IQ['type'] != "result" or not si_elt or si_elt.name != "si":
-            log.error(_("Protocol error during file transfer"))
-            return
-
-        feature_elts = self.host.plugins["XEP-0020"].getFeatureElt(si_elt)
-        if not feature_elts:
-            log.warning(_("No feature element"))
-            return
-
-        choosed_options = self.host.plugins["XEP-0020"].getChoosedOptions(feature_elts[0])
+        choosed_options = self.host.plugins["XEP-0020"].getChoosedOptions(feature_elt, namespace=None)
         try:
             stream_method = choosed_options["stream-method"]
         except KeyError:
-            log.warning(_("No stream method choosed"))
+            log.warning(u"No stream method choosed")
             return
 
-        range_offset = 0
-        # range_length = None
-        range_elts = filter(lambda elt: elt.name == 'range', si_elt.elements())
-        if range_elts:
-            range_elt = range_elts[0]
-            range_offset = range_elt.getAttribute("offset", 0)
-            # range_length = range_elt.getAttribute("length")
+        try:
+            file_elt = si_elt.elements(NS_SI_FT, "file").next()
+        except StopIteration:
+            pass
+        else:
+            range_, range_offset, range_length = self._parseRange(file_elt, size)
 
         if stream_method == self.host.plugins["XEP-0065"].NAMESPACE:
             plugin = self.host.plugins["XEP-0065"]
         elif stream_method == self.host.plugins["XEP-0047"].NAMESPACE:
             plugin = self.host.plugins["XEP-0047"]
         else:
-            log.error(u"Invalid stream method received")
+            log.warning(u"Invalid stream method received")
             return
 
-        file_obj = open(filepath, 'r')
-        if range_offset:
-            file_obj.seek(range_offset)
-        d = plugin.startStream(file_obj, jid.JID(IQ['from']), sid, profile=profile)
-        d.addCallback(self.sendSuccessCb, sid, file_obj, stream_method, profile)
-        d.addErrback(self.sendFailureCb, sid, file_obj, stream_method, profile)
+        file_obj = self._f.File(self.host,
+                                filepath,
+                                size=size,
+                                profile=client.profile
+                                )
+        d = plugin.startStream(file_obj, jid.JID(iq_elt['from']), sid, profile=client.profile)
+        d.addCallback(self._sendCb, sid, file_obj, client.profile)
+        d.addErrback(self._sendEb, sid, file_obj, client.profile)
 
-    def sendFile(self, to_jid, filepath, data={}, profile_key=C.PROF_KEY_NONE):
-        """send a file using XEP-0096
-        @to_jid: recipient
-        @filepath: absolute path to the file to send
-        @data: dictionnary with the optional following keys:
-               - "description": description of the file
-        @param profile_key: %(doc_profile_key)s
-        @return: an unique id to identify the transfer
-        """
-        profile = self.host.memory.getProfileName(profile_key)
-        if not profile:
-            log.warning(_("Trying to send a file from an unknown profile"))
-            return ""
-        feature_elt = self.host.plugins["XEP-0020"].proposeFeatures({'stream-method': self.managed_stream_m})
-
-        file_transfer_elts = []
+    def _fileEb(self, failure, filepath, sid, size, client):
+        if failure.check(error.StanzaError):
+            stanza_err = failure.value
+            if stanza_err.code == '403' and stanza_err.condition == 'forbidden':
+                from_s = stanza_err.stanza['from']
+                log.info(u"File transfer refused by {}".format(from_s))
+                self.host.bridge.newAlert(_("The contact {} has refused your file").format(from_s), _("File refused"), "INFO", client.profile)
+            else:
+                log.warning(_(u"Error during file transfer"))
+                self.host.bridge.newAlert(_(u"Something went wrong during the file transfer session intialisation: {reason}").format(reason=unicode(stanza_err.condition)), _("File transfer error"), "ERROR", client.profile)
+        elif failure.check(exceptions.DataError):
+            log.warning(u'Invalid stanza received')
+        else:
+            log.error(u'Error while proposing stream: {}'.format(failure))
 
-        statinfo = os.stat(filepath)
-        file_elt = domish.Element((PROFILE, 'file'))
-        file_elt['name'] = os.path.basename(filepath)
-        size = file_elt['size'] = str(statinfo.st_size)
-        file_transfer_elts.append(file_elt)
-
-        file_transfer_elts.append(domish.Element((None, 'range')))
-
-        sid, offer = self.host.plugins["XEP-0095"].proposeStream(jid.JID(to_jid), PROFILE, feature_elt, file_transfer_elts, profile_key=profile)
-        offer.addCallback(self.fileCb, filepath, sid, size, profile)
-        return sid
-
-    def _sendSuccessCb(self, sid, file_obj, stream_method, profile):
-        self.sendSuccessCb(sid, file_obj, stream_method, profile)
-
-    def sendSuccessCb(self, dummy, sid, file_obj, stream_method, profile):
-        log.info(_(u'Transfer %(sid)s successfuly finished [%(profile)s]')
-             % {"sid": sid, "profile": profile})
+    def _sendCb(self, dummy, sid, file_obj, profile):
+        log.info(_(u'transfer {sid} successfuly finished [{profile}]').format(
+            sid=sid,
+            profile=profile))
         file_obj.close()
 
-    def _sendFailureCb(self, sid, file_obj, stream_method, reason, profile):
-        self.sendFailureCb(failure.Failure(Exception(reason)), sid, file_obj, stream_method, profile)
-
-    def sendFailureCb(self, failure, sid, file_obj, stream_method, profile):
+    def _sendEb(self, failure, sid, file_obj, profile):
+        log.warning(_(u'transfer {sid} failed [{profile}]: {reason}').format(
+            sid=sid,
+            profile=profile,
+            reason=unicode(failure.condition),
+            ))
         file_obj.close()
-        log.warning(_(u'Transfer %(id)s failed with stream method %(s_method)s: %(reason)s [%(profile)s]') % {'id': sid, "s_method": stream_method, 'reason': unicode(failure), 'profile': profile})