changeset 1571:c668081eba1c

plugins XEP-0234, XEP-0260, XEP-0261: jingle session termination is managed by application (XEP-0234) instead of transport
author Goffi <goffi@goffi.org>
date Sun, 08 Nov 2015 14:48:04 +0100
parents 37d4be4a9fed
children 6a6fe840c3a6
files src/plugins/plugin_xep_0234.py src/plugins/plugin_xep_0260.py src/plugins/plugin_xep_0261.py
diffstat 3 files changed, 51 insertions(+), 33 deletions(-) [+]
line wrap: on
line diff
--- a/src/plugins/plugin_xep_0234.py	Sun Nov 08 14:44:33 2015 +0100
+++ b/src/plugins/plugin_xep_0234.py	Sun Nov 08 14:48:04 2015 +0100
@@ -31,6 +31,7 @@
 from twisted.words.protocols.jabber import jid
 from twisted.python import failure
 from twisted.words.protocols.jabber.xmlstream import XMPPHandler
+from twisted.internet import defer
 
 
 NS_JINGLE_FT = 'urn:xmpp:jingle:apps:file-transfer:4'
@@ -52,6 +53,7 @@
 
 
 class XEP_0234(object):
+    # TODO: assure everything is closed when file is sent or session terminate is received
 
     def __init__(self, host):
         log.info(_("plugin Jingle File Transfer initialization"))
@@ -81,11 +83,12 @@
     # Dialogs with user
     # the overwrite check is done here
 
-    def _getDestDir(self, session, content_data, profile):
+    def _getDestDir(self, session, content_name, content_data, profile):
         """Request confirmation and destination dir to user
 
         if transfer is confirmed, session is filled
         @param session(dict): jingle session data
+        @param content_name(unicode): name of the jingle content
         @param content_data(dict): content informations
         @param profile: %(doc_profile)s
         return (defer.Deferred): True if transfer is accepted
@@ -98,23 +101,31 @@
             type_=C.XMLUI_DIALOG_FILE,
             options={C.XMLUI_DATA_FILETYPE: C.XMLUI_DATA_FILETYPE_DIR},
             profile=profile)
-        d.addCallback(self._gotConfirmation, session, content_data, application_data, profile)
+        d.addCallback(self._gotConfirmation, session, content_name, content_data, application_data, profile)
         return d
 
-    def _openFileWrite(self, content_data, file_path, file_data, profile):
+    def _openFileWrite(self, session, content_name, content_data, file_path, file_data, profile):
         assert 'file_obj' not in content_data
-        content_data['file_obj'] = self._f.File(
+        file_obj = content_data['file_obj'] = self._f.File(
             self.host,
             file_path,
             'w',
             size=file_data['size'],
             profile=profile,
             )
+        finished_d = content_data['finished_d'] = defer.Deferred()
+        args = [file_obj, session, content_name, content_data, profile]
+        finished_d.addCallbacks(self._finishedCb, self._finishedEb, args, None, args)
 
-    def _gotConfirmation(self, data, session, content_data, application_data, profile):
+    def _gotConfirmation(self, data, session, content_name, content_data, application_data, profile):
         """Called when the permission and dest path have been received
 
         @param data(dict): xmlui data received from file dialog
+        @param session(dict): jingle session data
+        @param content_name(unicode): name of the jingle content
+        @param content_data(dict): content session_data
+        @param content_data(dict): application session data
+        @param profile: %(doc_profile)s
         return (bool): True if copy is wanted and OK
             False if user wants to cancel
             if fill exists ask confirmation and call again self._getDestDir if needed
@@ -130,10 +141,10 @@
         if os.path.exists(file_path):
             def check_overwrite(overwrite):
                 if overwrite:
-                    self._openFileWrite(content_data, file_path, file_data, profile)
+                    self._openFileWrite(session, content_name, content_data, file_path, file_data, profile)
                     return True
                 else:
-                    return self._getDestDir(session, content_data, profile)
+                    return self._getDestDir(session, content_name, content_data, profile)
 
             exists_d = xml_tools.deferConfirm(
                 self.host,
@@ -143,7 +154,7 @@
             exists_d.addCallback(check_overwrite)
             return exists_d
 
-        self._openFileWrite(content_data, file_path, file_data, profile)
+        self._openFileWrite(session, content_name, content_data, file_path, file_data, profile)
         return True
 
     # jingle callbacks
@@ -203,7 +214,7 @@
         content_data['application_data']['file_data'] = file_data
 
         # now we actualy request permission to user
-        return self._getDestDir(session, content_data, profile)
+        return self._getDestDir(session, content_name, content_data, profile)
 
 
     def jingleHandler(self, action, session, content_name, desc_elt, profile):
@@ -228,11 +239,26 @@
                                                   size=size,
                                                   profile=profile
                                                   )
-            file_obj.eof.addCallback(lambda dummy: file_obj.close())
+            finished_d = content_data['finished_d'] = defer.Deferred()
+            args = [file_obj, session, content_name, content_data, profile]
+            finished_d.addCallbacks(self._finishedCb, self._finishedEb, args, None, args)
         else:
             log.warning(u"FIXME: unmanaged action {}".format(action))
         return desc_elt
 
+    def _finishedCb(self, dummy, file_obj, session, content_name, content_data, profile):
+        log.debug(u"File transfer completed successfuly")
+        if content_data['senders'] != session['role']:
+            # we terminate the session only if we are the received,
+            # as recommanded in XEP-0234 ยง2 (after example 6)
+            self._j.contentTerminate(session, content_name, profile=profile)
+        file_obj.close()
+
+    def _finishedEb(self, failure, file_obj, session, content_name, content_data, profile):
+        log.warning(u"Error while streaming through s5b: {}".format(failure))
+        file_obj.close()
+        self._j.contentTerminate(session, content_name, reason=self._j.REASON_FAILED_TRANSPORT, profile=profile)
+
 
 class XEP_0234_handler(XMPPHandler):
     implements(iwokkel.IDisco)
--- a/src/plugins/plugin_xep_0260.py	Sun Nov 08 14:44:33 2015 +0100
+++ b/src/plugins/plugin_xep_0260.py	Sun Nov 08 14:48:04 2015 +0100
@@ -221,9 +221,17 @@
         if choosed_candidate is None:
             log.warning(u"Socks5 negociation failed, we need to fallback to IBB")
         else:
-            # best_peer_candidate was choosed from the candidates we have sent
-            # so our_candidate is true if choosed_candidate is peer_best_candidate
-            our_candidate = choosed_candidate==peer_best_candidate
+            if choosed_candidate == peer_best_candidate:
+                # peer_best_candidate was choosed from the candidates we have sent
+                # so our_candidate is true if choosed_candidate is peer_best_candidate
+                our_candidate = True
+                # than also mean that best_candidate must be discarded !
+                try:
+                    best_candidate.discard()
+                except AttributeError: # but it can be None
+                    pass
+            else:
+                our_candidate = False
 
             log.info(u"Socks5 negociation successful, {who} candidate will be used: {candidate}".format(
                 who = u'our' if our_candidate else u'other peer',
@@ -326,8 +334,7 @@
             peer_candidates = transport_data['peer_candidates']
             file_obj = content_data['file_obj']
             stream_d = self._s5b.registerHash(session_hash, file_obj, profile)
-            args = [session, content_name, profile]
-            stream_d.addCallbacks(self._streamCb, self._streamEb, args, None, args)
+            stream_d.chainDeferred(content_data['finished_d'])
             d = self._s5b.getBestCandidate(peer_candidates, session_hash, profile)
             d.addCallback(self._foundPeerCandidate, session, transport_data, content_name, client)
 
@@ -340,8 +347,7 @@
             peer_candidates = transport_data['peer_candidates'] = self._parseCandidates(transport_elt)
             file_obj = content_data['file_obj']
             stream_d = self._s5b.registerHash(session_hash, file_obj, profile)
-            args = [session, content_name, profile]
-            stream_d.addCallbacks(self._streamCb, self._streamEb, args, None, args)
+            stream_d.chainDeferred(content_data['finished_d'])
             d = self._s5b.getBestCandidate(peer_candidates, session_hash, profile)
             d.addCallback(self._foundPeerCandidate, session, transport_data, content_name, client)
             candidates = yield self._s5b.getCandidates(profile)
@@ -375,13 +381,6 @@
 
         defer.returnValue(transport_elt)
 
-    def _streamCb(self, dummy, session, content_name, profile):
-        self._j.contentTerminate(session, content_name, profile=profile)
-
-    def _streamEb(self, failure, session, content_name, profile):
-        log.warning(u"Error while streaming through s5b: {}".format(failure))
-        self._j.contentTerminate(session, content_name, reason=self._j.REASON_FAILED_TRANSPORT, profile=profile)
-
 
 class XEP_0260_handler(XMPPHandler):
     implements(iwokkel.IDisco)
--- a/src/plugins/plugin_xep_0261.py	Sun Nov 08 14:44:33 2015 +0100
+++ b/src/plugins/plugin_xep_0261.py	Sun Nov 08 14:48:04 2015 +0100
@@ -77,24 +77,17 @@
             peer_jid = session['peer_jid']
             sid = transport_data['sid']
             file_obj = content_data['file_obj']
-            args = [session, content_name, profile]
             if action == self._j.A_START:
                 block_size = transport_data['block_size']
                 d = self._ibb.startStream(file_obj, peer_jid, sid, block_size, profile)
-                d.addErrback(self._streamEb, *args)
+                d.chainDeferred(content_data['finished_d'])
             else:
                 d = self._ibb.createSession(file_obj, peer_jid, sid, profile)
-                d.addCallbacks(self._streamCb, self._streamEb, args, None, args)
+                d.chainDeferred(content_data['finished_d'])
         else:
             log.warning(u"FIXME: unmanaged action {}".format(action))
         return transport_elt
 
-    def _streamCb(self, dummy, session, content_name, profile):
-        self._j.contentTerminate(session, content_name, profile=profile)
-
-    def _streamEb(self, failure, session, content_name, profile):
-        log.warning(u"Error while streaming in-band: {}".format(failure))
-        self._j.contentTerminate(session, content_name, reason=self._j.REASON_FAILED_TRANSPORT, profile=profile)
 
 class XEP_0261_handler(XMPPHandler):
     implements(iwokkel.IDisco)