diff src/plugins/plugin_xep_0096.py @ 1577:d04d7402b8e9

plugins XEP-0020, XEP-0065, XEP-0095, XEP-0096: fixed file copy with Stream Initiation: /!\ range is not working yet /!\ pipe plugin is broken for now
author Goffi <goffi@goffi.org>
date Wed, 11 Nov 2015 18:19:49 +0100
parents 7cc29634b6ef
children d46aae87c03a
line wrap: on
line diff
--- a/src/plugins/plugin_xep_0096.py	Wed Nov 11 18:19:49 2015 +0100
+++ b/src/plugins/plugin_xep_0096.py	Wed Nov 11 18:19:49 2015 +0100
@@ -21,18 +21,17 @@
 from sat.core.constants import Const as C
 from sat.core.log import getLogger
 log = getLogger(__name__)
+from sat.core import exceptions
 from twisted.words.xish import domish
 from twisted.words.protocols.jabber import jid
-from twisted.words.protocols import jabber
+from twisted.words.protocols.jabber import error
 import os
-from twisted.internet import reactor
-from twisted.python import failure
+
 
-from wokkel import data_form
-
+NS_SI_FT = "http://jabber.org/protocol/si/profile/file-transfer"
 IQ_SET = '/iq[@type="set"]'
-PROFILE_NAME = "file-transfer"
-PROFILE = "http://jabber.org/protocol/si/profile/" + PROFILE_NAME
+SI_PROFILE_NAME = "file-transfer"
+SI_PROFILE = "http://jabber.org/protocol/si/profile/" + SI_PROFILE_NAME
 
 PLUGIN_INFO = {
     "name": "XEP-0096 Plugin",
@@ -53,84 +52,125 @@
         self.host = host
         self.managed_stream_m = [self.host.plugins["XEP-0065"].NAMESPACE,
                                  self.host.plugins["XEP-0047"].NAMESPACE]  # Stream methods managed
-        self.host.plugins["XEP-0095"].registerSIProfile(PROFILE_NAME, self.transferRequest)
-        host.bridge.addMethod("sendFile", ".plugin", in_sign='ssa{ss}s', out_sign='s', method=self.sendFile)
+        self._f = self.host.plugins["FILE"]
+        self._si = self.host.plugins["XEP-0095"]
+        self._si.registerSIProfile(SI_PROFILE_NAME, self._transferRequest)
+        host.bridge.addMethod("siSendFile", ".plugin", in_sign='sssss', out_sign='s', method=self._sendFile)
+
+    def unload(self):
+        self._si.unregisterSIProfile(SI_PROFILE_NAME)
+
+    def _badRequest(self, iq_elt, message=None, profile=C.PROF_KEY_NONE):
+        """Send a bad-request error
 
-    def profileConnected(self, profile):
-        client = self.host.getClient(profile)
-        client._xep_0096_waiting_for_approval = {}  # key = id, value = [transfer data, IdelayedCall Reactor timeout,
-                                        # current stream method, [failed stream methods], profile]
+        @param iq_elt(domish.Element): initial <IQ> element of the SI request
+        @param message(None, unicode): informational message to display in the logs
+        @param profile: %(doc_profile)s
+        """
+        if message is not None:
+            log.warning(message)
+        self._si.sendError(iq_elt, 'bad-request', profile)
 
-    def _kill_id(self, approval_id, profile):
-        """Delete a waiting_for_approval id, called after timeout
-        @param approval_id: id of _xep_0096_waiting_for_approval"""
-        log.info(_("SI File Transfer: TimeOut reached for id %s") % approval_id)
+    def _parseRange(self, parent_elt, file_size):
+        """find and parse <range/> element
+
+        @param parent_elt(domish.Element): direct parent of the <range/> element
+        @return (tuple[bool, int, int]): a tuple with
+            - True if range is required
+            - range_offset
+            - range_length
+        """
         try:
-            client = self.host.getClient(profile)
-            del client._xep_0096_waiting_for_approval[approval_id]
-        except KeyError:
-            log.warning(_("kill id called on a non existant approval id"))
+            range_elt = parent_elt.elements(NS_SI_FT, 'range').next()
+        except StopIteration:
+            range_ = False
+            range_offset = None
+            range_length = None
+        else:
+            range_ = True
 
-    def transferRequest(self, iq_id, from_jid, si_id, si_mime_type, si_el, profile):
+            try:
+                range_offset = int(range_elt['offset'])
+            except KeyError:
+                range_offset = 0
+
+            try:
+                range_length = int(range_elt['length'])
+            except KeyError:
+                range_length = file_size
+
+            if range_offset != 0 or range_length != file_size:
+                raise NotImplementedError # FIXME
+
+        return range_, range_offset, range_length
+
+    def _transferRequest(self, iq_elt, si_id, si_mime_type, si_elt, profile):
         """Called when a file transfer is requested
-        @param iq_id: id of the iq request
-        @param from_jid: jid of the sender
-        @param si_id: Stream Initiation session id
-        @param si_mime_type: Mime type of the file (or default "application/octet-stream" if unknown)
-        @param si_el: domish.Element of the request
-        @param profile: %(doc_profile)s"""
+
+        @param iq_elt(domish.Element): initial <IQ> element of the SI request
+        @param si_id(unicode): Stream Initiation session id
+        @param si_mime_type("unicode"): Mime type of the file (or default "application/octet-stream" if unknown)
+        @param si_elt(domish.Element): request
+        @param profile: %(doc_profile)s
+        """
         log.info(_("XEP-0096 file transfer requested"))
-        log.debug(si_el.toXml())
-        client = self.host.getClient(profile)
-        filename = ""
-        file_size = ""
-        file_date = None
-        file_hash = None
-        file_desc = ""
-        can_range = False
-        file_elts = filter(lambda elt: elt.name == 'file', si_el.elements())
-        feature_elts = self.host.plugins["XEP-0020"].getFeatureElt(si_el)
+        peer_jid = jid.JID(iq_elt['from'])
+
+        try:
+            file_elt = si_elt.elements(NS_SI_FT, "file").next()
+        except StopIteration:
+            return self._badRequest(iq_elt, "No <file/> element found in SI File Transfer request", profile)
+
+        try:
+            feature_elt = self.host.plugins["XEP-0020"].getFeatureElt(si_elt)
+        except exceptions.NotFound:
+            return self._badRequest(iq_elt, "No <feature/> element found in SI File Transfer request", profile)
+
+        try:
+            filename = file_elt["name"]
+            file_size = int(file_elt["size"])
+        except (KeyError, ValueError):
+            return self._badRequest(iq_elt, "Malformed SI File Transfer request", profile)
+
+        file_date = file_elt.getAttribute("date")
+        file_hash = file_elt.getAttribute("hash")
+
+        log.info(u"File proposed: name=[{name}] size={size}".format(name=filename, size=file_size))
 
-        if file_elts:
-            file_el = file_elts[0]
-            filename = file_el["name"]
-            file_size = file_el["size"]
-            file_date = file_el.getAttribute("date", "")
-            file_hash = file_el.getAttribute("hash", "")
-            log.info(_(u"File proposed: name=[%(name)s] size=%(size)s") % {'name': filename, 'size': file_size})
-            for file_child_el in file_el.elements():
-                if file_child_el.name == "desc":
-                    file_desc = unicode(file_child_el)
-                elif file_child_el.name == "range":
-                    can_range = True
-        else:
-            log.warning(_("No file element found"))
-            self.host.plugins["XEP-0095"].sendBadRequestError(iq_id, from_jid, profile)
-            return
+        try:
+            file_desc = unicode(file_elt.elements(NS_SI_FT, 'desc').next())
+        except StopIteration:
+            file_desc = ''
+
+        try:
+            range_, range_offset, range_length = self._parseRange(file_elt, file_size)
+        except ValueError:
+            return self._badRequest(iq_elt, "Malformed SI File Transfer request", profile)
 
-        if feature_elts:
-            feature_el = feature_elts[0]
-            data_form.Form.fromElement(feature_el.firstChildElement())
-            try:
-                stream_method = self.host.plugins["XEP-0020"].negociate(feature_el, 'stream-method', self.managed_stream_m)
-            except KeyError:
-                log.warning(_("No stream method found"))
-                self.host.plugins["XEP-0095"].sendBadRequestError(iq_id, from_jid, profile)
-                return
-            if not stream_method:
-                log.warning(_("Can't find a valid stream method"))
-                self.host.plugins["XEP-0095"].sendFailedError(iq_id, from_jid, profile)
-                return
+        try:
+            stream_method = self.host.plugins["XEP-0020"].negotiate(feature_elt, 'stream-method', self.managed_stream_m, namespace=None)
+        except KeyError:
+            return self._badRequest(iq_elt, "No stream method found", profile)
+
+        if stream_method:
+            if stream_method == self.host.plugins["XEP-0065"].NAMESPACE:
+                plugin = self.host.plugins["XEP-0065"]
+            elif stream_method == self.host.plugins["XEP-0047"].NAMESPACE:
+                plugin = self.host.plugins["XEP-0047"]
+            else:
+                log.error(_("Unknown stream method, this should not happen at this stage, cancelling transfer"))
         else:
-            log.warning(_("No feature element found"))
-            self.host.plugins["XEP-0095"].sendBadRequestError(iq_id, from_jid, profile)
+            log.warning(_("Can't find a valid stream method"))
+            self._si.sendError(iq_elt, 'not-acceptable', profile)
             return
 
         #if we are here, the transfer can start, we just need user's agreement
-        data = {"filename": filename, "id": iq_id, "from": from_jid, "size": file_size, "date": file_date, "hash": file_hash, "desc": file_desc, "can_range": str(can_range)}
-        client._xep_0096_waiting_for_approval[si_id] = [data, reactor.callLater(300, self._kill_id, si_id, profile), stream_method, []]
+        data = {"name": filename, "peer_jid": peer_jid, "size": file_size, "date": file_date, "hash": file_hash, "desc": file_desc,
+                "range": range_, "range_offset": range_offset, "range_length": range_length,
+                "si_id": si_id, "stream_method": stream_method, "stream_plugin": plugin}
 
-        self.host.askConfirmation(si_id, "FILE_TRANSFER", data, self.confirmationCB, profile)
+        d = self._f.getDestDir(peer_jid, data, data, profile)
+        d.addCallback(self.confirmationCb, iq_elt, data, profile)
 
     def _getFileObject(self, dest_path, can_range=False):
         """Open file, put file pointer to the end if the file if needed
@@ -139,178 +179,176 @@
         @return: File Object"""
         return open(dest_path, "ab" if can_range else "wb")
 
-    def confirmationCB(self, sid, accepted, frontend_data, profile):
+    def confirmationCb(self, accepted, iq_elt, data, profile):
         """Called on confirmation answer
-        @param sid: file transfer session id
-        @param accepted: True if file transfer is accepted
-        @param frontend_data: data sent by frontend"""
-        client = self.host.getClient(profile)
-        data, timeout, stream_method, failed_methods = client._xep_0096_waiting_for_approval[sid]
-        can_range = data['can_range'] == "True"
-        range_offset = 0
-        if accepted:
-            if timeout.active():
-                timeout.cancel()
-            try:
-                dest_path = frontend_data['dest_path']
-            except KeyError:
-                log.error(_('dest path not found in frontend_data'))
-                del client._xep_0096_waiting_for_approval[sid]
-                return
-            if stream_method == self.host.plugins["XEP-0065"].NAMESPACE:
-                plugin = self.host.plugins["XEP-0065"]
-            elif stream_method == self.host.plugins["XEP-0047"].NAMESPACE:
-                plugin = self.host.plugins["XEP-0047"]
-            else:
-                log.error(_("Unknown stream method, this should not happen at this stage, cancelling transfer"))
-                del client._xep_0096_waiting_for_approval[sid]
-                return
 
-            file_obj = self._getFileObject(dest_path, can_range)
-            range_offset = file_obj.tell()
-            d = plugin.createSession(file_obj, jid.JID(data['from']), sid, int(data["size"]), profile)
-            d.addCallback(self._transferSucceeded, sid, file_obj, stream_method, profile)
-            d.addErrback(self._transferFailed, sid, file_obj, stream_method, profile)
+        @param accepted(bool): True if file transfer is accepted
+        @param iq_elt(domish.Element): initial SI request
+        @param data(dict): session data
+        @param profile: %(doc_profile)s
+        """
+        if not accepted:
+            log.info(u"File transfer declined")
+            self._si.sendError(iq_elt, 'forbidden', profile)
+            return
+        # data, timeout, stream_method, failed_methods = client._xep_0096_waiting_for_approval[sid]
+        # can_range = data['can_range'] == "True"
+        # range_offset = 0
+        # if timeout.active():
+        #     timeout.cancel()
+        # try:
+        #     dest_path = frontend_data['dest_path']
+        # except KeyError:
+        #     log.error(_('dest path not found in frontend_data'))
+        #     del client._xep_0096_waiting_for_approval[sid]
+        #     return
+        # if stream_method == self.host.plugins["XEP-0065"].NAMESPACE:
+        #     plugin = self.host.plugins["XEP-0065"]
+        # elif stream_method == self.host.plugins["XEP-0047"].NAMESPACE:
+        #     plugin = self.host.plugins["XEP-0047"]
+        # else:
+        #     log.error(_("Unknown stream method, this should not happen at this stage, cancelling transfer"))
+        #     del client._xep_0096_waiting_for_approval[sid]
+        #     return
 
-            #we can send the iq result
-            feature_elt = self.host.plugins["XEP-0020"].chooseOption({'stream-method': stream_method})
-            misc_elts = []
-            misc_elts.append(domish.Element((PROFILE, "file")))
-            if can_range:
-                range_elt = domish.Element((None, "range"))
-                range_elt['offset'] = str(range_offset)
-                #TODO: manage range length
-                misc_elts.append(range_elt)
-            self.host.plugins["XEP-0095"].acceptStream(data["id"], data['from'], feature_elt, misc_elts, profile)
-        else:
-            log.debug(_(u"Transfer [%s] refused") % sid)
-            self.host.plugins["XEP-0095"].sendRejectedError(data["id"], data['from'], profile=profile)
-            del(client._xep_0096_waiting_for_approval[sid])
+        # file_obj = self._getFileObject(dest_path, can_range)
+        # range_offset = file_obj.tell()
+        d = data['stream_plugin'].createSession(data['file_obj'], data['peer_jid'], data['si_id'], profile=profile)
+        d.addCallback(self._transferCb, data, profile)
+        d.addErrback(self._transferEb, data, profile)
 
-    def _transferSucceeded(self, dummy, sid, file_obj, stream_method, profile):
-        self.transferSucceeded(sid, file_obj, stream_method, profile)
+        #we can send the iq result
+        feature_elt = self.host.plugins["XEP-0020"].chooseOption({'stream-method': data['stream_method']}, namespace=None)
+        misc_elts = []
+        misc_elts.append(domish.Element((SI_PROFILE, "file")))
+        # if can_range:
+        #     range_elt = domish.Element((None, "range"))
+        #     range_elt['offset'] = str(range_offset)
+        #     #TODO: manage range length
+        #     misc_elts.append(range_elt)
+        self._si.acceptStream(iq_elt, feature_elt, misc_elts, profile)
 
-    def transferSucceeded(self, dummy, sid, file_obj, stream_method, profile):
+    def _transferCb(self, dummy, data, profile):
         """Called by the stream method when transfer successfuly finished
-        @param id: stream id"""
-        client = self.host.getClient(profile)
-        file_obj.close()
-        log.info(_('Transfer %s successfuly finished') % sid)
-        del(client._xep_0096_waiting_for_approval[sid])
 
-    def _transferFailed(self, sid, file_obj, stream_method, reason, profile):
-        self.transferFailed(failure.Failure(Exception(reason)), sid, file_obj, stream_method, profile)
+        @param data: session data
+        @param profile: %(doc_profile)s
+        """
+        #TODO: check hash
+        data['file_obj'].close()
+        log.info(u'Transfer {si_id} successfuly finished'.format(**data))
 
-    def transferFailed(self, failure, sid, file_obj, stream_method, profile):
+    def _transferEb(self, failure, data, profile):
         """Called when something went wrong with the transfer
 
         @param id: stream id
+        @param data: session data
+        @param profile: %(doc_profile)s
+        """
+        log.warning(u'Transfer {si_id} failed: {reason}'.format(reason=unicode(failure.condition), **data))
+        data['file_obj'].close()
+
+    def _sendFile(self, peer_jid_s, filepath, name, desc, profile=C.PROF_KEY_NONE):
+        return self.sendFile(jid.JID(peer_jid_s), filepath, name or None, desc or None, profile)
+
+    def sendFile(self, peer_jid, filepath, name=None, desc=None, profile=C.PROF_KEY_NONE):
+        """Send a file using XEP-0096
+
+        @param peer_jid(jid.JID): recipient
+        @param filepath(str): absolute path to the file to send
+        @param name(unicode): name of the file to send
+            name must not contain "/" characters
+        @param desc: description of the file
+        @param profile: %(doc_profile)s
+        @return: an unique id to identify the transfer
         """
         client = self.host.getClient(profile)
-        data, timeout, stream_method, failed_methods = client._xep_0096_waiting_for_approval[sid]
-        log.warning(_(u'Transfer %(id)s failed with stream method %(s_method)s: %(reason)s') % {
-            'id': sid,
-            's_method': stream_method,
-            'reason': unicode(failure)})
-        filepath = file_obj.name
-        file_obj.close()
-        os.remove(filepath)
-        #TODO: session remenber (within a time limit) when a stream method fail, and avoid that stream method with full jid for the rest of the session
-        log.warning(_("All stream methods failed, can't transfer the file"))
-        del(client._xep_0096_waiting_for_approval[sid])
+        feature_elt = self.host.plugins["XEP-0020"].proposeFeatures({'stream-method': self.managed_stream_m}, namespace=None)
+
+        file_transfer_elts = []
+
+        statinfo = os.stat(filepath)
+        file_elt = domish.Element((SI_PROFILE, 'file'))
+        file_elt['name'] = name or os.path.basename(filepath)
+        assert '/' not in file_elt['name']
+        size = statinfo.st_size
+        file_elt['size'] = str(size)
+        if desc:
+            file_elt.addElement('desc', content=desc)
+        file_transfer_elts.append(file_elt)
 
-    def fileCb(self, filepath, sid, size, profile, IQ):
-        if IQ['type'] == "error":
-            stanza_err = jabber.error.exceptionFromStanza(IQ)
-            if stanza_err.code == '403' and stanza_err.condition == 'forbidden':
-                log.debug(_(u"File transfer refused by %s") % IQ['from'])
-                self.host.bridge.newAlert(_("The contact %s refused your file") % IQ['from'], _("File refused"), "INFO", profile)
-            else:
-                log.warning(_(u"Error during file transfer with %s") % IQ['from'])
-                self.host.bridge.newAlert(_("Something went wrong during the file transfer session intialisation with %s") % IQ['from'], _("File transfer error"), "ERROR", profile)
+        file_transfer_elts.append(domish.Element((None, 'range')))
+
+        sid, offer_d = self._si.proposeStream(peer_jid, SI_PROFILE, feature_elt, file_transfer_elts, profile=client.profile)
+        args = [filepath, sid, size, client]
+        offer_d.addCallbacks(self._fileCb, self._fileEb, args, None, args)
+        return sid
+
+    def _fileCb(self, result_tuple, filepath, sid, size, client):
+        iq_elt, si_elt = result_tuple
+
+        try:
+            feature_elt = self.host.plugins["XEP-0020"].getFeatureElt(si_elt)
+        except exceptions.NotFound:
+            log.warning(u"No <feature/> element found in result while expected")
             return
 
-        si_elt = IQ.firstChildElement()
-
-        if IQ['type'] != "result" or not si_elt or si_elt.name != "si":
-            log.error(_("Protocol error during file transfer"))
-            return
-
-        feature_elts = self.host.plugins["XEP-0020"].getFeatureElt(si_elt)
-        if not feature_elts:
-            log.warning(_("No feature element"))
-            return
-
-        choosed_options = self.host.plugins["XEP-0020"].getChoosedOptions(feature_elts[0])
+        choosed_options = self.host.plugins["XEP-0020"].getChoosedOptions(feature_elt, namespace=None)
         try:
             stream_method = choosed_options["stream-method"]
         except KeyError:
-            log.warning(_("No stream method choosed"))
+            log.warning(u"No stream method choosed")
             return
 
-        range_offset = 0
-        # range_length = None
-        range_elts = filter(lambda elt: elt.name == 'range', si_elt.elements())
-        if range_elts:
-            range_elt = range_elts[0]
-            range_offset = range_elt.getAttribute("offset", 0)
-            # range_length = range_elt.getAttribute("length")
+        try:
+            file_elt = si_elt.elements(NS_SI_FT, "file").next()
+        except StopIteration:
+            pass
+        else:
+            range_, range_offset, range_length = self._parseRange(file_elt, size)
 
         if stream_method == self.host.plugins["XEP-0065"].NAMESPACE:
             plugin = self.host.plugins["XEP-0065"]
         elif stream_method == self.host.plugins["XEP-0047"].NAMESPACE:
             plugin = self.host.plugins["XEP-0047"]
         else:
-            log.error(u"Invalid stream method received")
+            log.warning(u"Invalid stream method received")
             return
 
-        file_obj = open(filepath, 'r')
-        if range_offset:
-            file_obj.seek(range_offset)
-        d = plugin.startStream(file_obj, jid.JID(IQ['from']), sid, profile=profile)
-        d.addCallback(self.sendSuccessCb, sid, file_obj, stream_method, profile)
-        d.addErrback(self.sendFailureCb, sid, file_obj, stream_method, profile)
+        file_obj = self._f.File(self.host,
+                                filepath,
+                                size=size,
+                                profile=client.profile
+                                )
+        d = plugin.startStream(file_obj, jid.JID(iq_elt['from']), sid, profile=client.profile)
+        d.addCallback(self._sendCb, sid, file_obj, client.profile)
+        d.addErrback(self._sendEb, sid, file_obj, client.profile)
 
-    def sendFile(self, to_jid, filepath, data={}, profile_key=C.PROF_KEY_NONE):
-        """send a file using XEP-0096
-        @to_jid: recipient
-        @filepath: absolute path to the file to send
-        @data: dictionnary with the optional following keys:
-               - "description": description of the file
-        @param profile_key: %(doc_profile_key)s
-        @return: an unique id to identify the transfer
-        """
-        profile = self.host.memory.getProfileName(profile_key)
-        if not profile:
-            log.warning(_("Trying to send a file from an unknown profile"))
-            return ""
-        feature_elt = self.host.plugins["XEP-0020"].proposeFeatures({'stream-method': self.managed_stream_m})
-
-        file_transfer_elts = []
+    def _fileEb(self, failure, filepath, sid, size, client):
+        if failure.check(error.StanzaError):
+            stanza_err = failure.value
+            if stanza_err.code == '403' and stanza_err.condition == 'forbidden':
+                from_s = stanza_err.stanza['from']
+                log.info(u"File transfer refused by {}".format(from_s))
+                self.host.bridge.newAlert(_("The contact {} has refused your file").format(from_s), _("File refused"), "INFO", client.profile)
+            else:
+                log.warning(_(u"Error during file transfer"))
+                self.host.bridge.newAlert(_(u"Something went wrong during the file transfer session intialisation: {reason}").format(reason=unicode(stanza_err.condition)), _("File transfer error"), "ERROR", client.profile)
+        elif failure.check(exceptions.DataError):
+            log.warning(u'Invalid stanza received')
+        else:
+            log.error(u'Error while proposing stream: {}'.format(failure))
 
-        statinfo = os.stat(filepath)
-        file_elt = domish.Element((PROFILE, 'file'))
-        file_elt['name'] = os.path.basename(filepath)
-        size = file_elt['size'] = str(statinfo.st_size)
-        file_transfer_elts.append(file_elt)
-
-        file_transfer_elts.append(domish.Element((None, 'range')))
-
-        sid, offer = self.host.plugins["XEP-0095"].proposeStream(jid.JID(to_jid), PROFILE, feature_elt, file_transfer_elts, profile_key=profile)
-        offer.addCallback(self.fileCb, filepath, sid, size, profile)
-        return sid
-
-    def _sendSuccessCb(self, sid, file_obj, stream_method, profile):
-        self.sendSuccessCb(sid, file_obj, stream_method, profile)
-
-    def sendSuccessCb(self, dummy, sid, file_obj, stream_method, profile):
-        log.info(_(u'Transfer %(sid)s successfuly finished [%(profile)s]')
-             % {"sid": sid, "profile": profile})
+    def _sendCb(self, dummy, sid, file_obj, profile):
+        log.info(_(u'transfer {sid} successfuly finished [{profile}]').format(
+            sid=sid,
+            profile=profile))
         file_obj.close()
 
-    def _sendFailureCb(self, sid, file_obj, stream_method, reason, profile):
-        self.sendFailureCb(failure.Failure(Exception(reason)), sid, file_obj, stream_method, profile)
-
-    def sendFailureCb(self, failure, sid, file_obj, stream_method, profile):
+    def _sendEb(self, failure, sid, file_obj, profile):
+        log.warning(_(u'transfer {sid} failed [{profile}]: {reason}').format(
+            sid=sid,
+            profile=profile,
+            reason=unicode(failure.condition),
+            ))
         file_obj.close()
-        log.warning(_(u'Transfer %(id)s failed with stream method %(s_method)s: %(reason)s [%(profile)s]') % {'id': sid, "s_method": stream_method, 'reason': unicode(failure), 'profile': profile})