diff src/plugins/plugin_xep_0234.py @ 2489:e2a7bb875957

plugin pipe/stream, file transfert: refactoring and improvments: this is a big patch as things had to be changed at the same time. - changed methods using profile argument to use client instead - move SatFile in a new tools.stream module, has it should be part of core, not a plugin - new IStreamProducer interface, to handler starting a pull producer - new FileStreamObject which create a stream producer/consumer from a SatFile - plugin pipe is no more using unix named pipe, as it complicate the thing, special care need to be taken to not block, and it's generally not necessary. Instead a socket is now used, so the plugin has been renomed to jingle stream. - bad connection/error should be better handler in jingle stream plugin, and code should not block anymore - jp pipe commands have been updated accordingly fix bug 237
author Goffi <goffi@goffi.org>
date Thu, 08 Feb 2018 00:37:42 +0100
parents 0046283a285d
children 7ad5f2c4e34a
line wrap: on
line diff
--- a/src/plugins/plugin_xep_0234.py	Thu Feb 01 07:24:34 2018 +0100
+++ b/src/plugins/plugin_xep_0234.py	Thu Feb 08 00:37:42 2018 +0100
@@ -25,6 +25,7 @@
 from wokkel import disco, iwokkel
 from zope.interface import implements
 from sat.tools import utils
+from sat.tools import stream
 import os.path
 from twisted.words.xish import domish
 from twisted.words.protocols.jabber import jid
@@ -76,33 +77,33 @@
         return u'{}_{}'.format(session['id'], content_name)
 
     def _fileJingleSend(self, peer_jid, filepath, name="", file_desc="", profile=C.PROF_KEY_NONE):
-        return self.fileJingleSend(jid.JID(peer_jid), filepath, name or None, file_desc or None, profile)
+        client = self.host.getClient(profile)
+        return self.fileJingleSend(client, jid.JID(peer_jid), filepath, name or None, file_desc or None)
 
-    def fileJingleSend(self, peer_jid, filepath, name, file_desc=None, profile=C.PROF_KEY_NONE):
+    def fileJingleSend(self, client, peer_jid, filepath, name, file_desc=None):
         """Send a file using jingle file transfer
 
         @param peer_jid(jid.JID): destinee jid
         @param filepath(str): absolute path of the file
         @param name(unicode, None): name of the file
         @param file_desc(unicode, None): description of the file
-        @param profile: %(doc_profile)s
         @return (D(unicode)): progress id
         """
         progress_id_d = defer.Deferred()
-        self._j.initiate(peer_jid,
+        self._j.initiate(client,
+                         peer_jid,
                          [{'app_ns': NS_JINGLE_FT,
                            'senders': self._j.ROLE_INITIATOR,
                            'app_kwargs': {'filepath': filepath,
                                           'name': name,
                                           'file_desc': file_desc,
                                           'progress_id_d': progress_id_d},
-                         }],
-                         profile=profile)
+                         }])
         return progress_id_d
 
     # jingle callbacks
 
-    def jingleSessionInit(self, session, content_name, filepath, name, file_desc, progress_id_d, profile=C.PROF_KEY_NONE):
+    def jingleSessionInit(self, client, session, content_name, filepath, name, file_desc, progress_id_d):
         progress_id_d.callback(self._getProgressId(session, content_name))
         content_data = session['contents'][content_name]
         application_data = content_data['application_data']
@@ -122,7 +123,7 @@
         file_elt.addChild(self._hash.buildHashElt())
         return desc_elt
 
-    def jingleRequestConfirmation(self, action, session, content_name, desc_elt, profile):
+    def jingleRequestConfirmation(self, client, action, session, content_name, desc_elt):
         """This method request confirmation for a jingle session"""
         content_data = session['contents'][content_name]
         if content_data['senders'] not in (self._j.ROLE_INITIATOR, self._j.ROLE_RESPONDER):
@@ -166,15 +167,15 @@
         def gotConfirmation(confirmed):
             if confirmed:
                 finished_d = content_data['finished_d'] = defer.Deferred()
-                args = [session, content_name, content_data, profile]
+                args = [client, session, content_name, content_data]
                 finished_d.addCallbacks(self._finishedCb, self._finishedEb, args, None, args)
             return confirmed
 
-        d = self._f.getDestDir(session['peer_jid'], content_data, file_data, profile)
+        d = self._f.getDestDir(client, session['peer_jid'], content_data, file_data, stream_object=True)
         d.addCallback(gotConfirmation)
         return d
 
-    def jingleHandler(self, action, session, content_name, desc_elt, profile):
+    def jingleHandler(self, client, action, session, content_name, desc_elt):
         content_data = session['contents'][content_name]
         application_data = content_data['application_data']
         if action in (self._j.A_ACCEPTED_ACK,):
@@ -188,27 +189,28 @@
                 log.debug("adding <range> element")
                 file_elt.addElement('range')
         elif action == self._j.A_SESSION_ACCEPT:
-            assert not 'file_obj' in content_data
+            assert not 'stream_object' in content_data
             file_data = application_data['file_data']
             file_path = application_data['file_path']
             size = file_data['size']
             # XXX: hash security is not critical here, so we just take the higher mandatory one
             hasher = file_data['hash_hasher'] = self._hash.getHasher('sha-256')
-            content_data['file_obj'] = self._f.File(self.host,
-                                       file_path,
-                                       uid=self._getProgressId(session, content_name),
-                                       size=size,
-                                       data_cb=lambda data: hasher.update(data),
-                                       profile=profile
-                                       )
+            content_data['stream_object'] = stream.FileStreamObject(
+                self.host,
+                client,
+                file_path,
+                uid=self._getProgressId(session, content_name),
+                size=size,
+                data_cb=lambda data: hasher.update(data),
+                )
             finished_d = content_data['finished_d'] = defer.Deferred()
-            args = [session, content_name, content_data, profile]
+            args = [client, session, content_name, content_data]
             finished_d.addCallbacks(self._finishedCb, self._finishedEb, args, None, args)
         else:
             log.warning(u"FIXME: unmanaged action {}".format(action))
         return desc_elt
 
-    def jingleSessionInfo(self, action, session, content_name, jingle_elt, profile):
+    def jingleSessionInfo(self, client, action, session, content_name, jingle_elt):
         """Called on session-info action
 
         manage checksum, and ignore <received/> element
@@ -240,17 +242,17 @@
                     log.warning(u"Hash algorithm used in given hash ({peer_algo}) doesn't correspond to the one we have used ({our_algo})"
                         .format(peer_algo=algo, our_algo=file_data.get('hash_algo')))
                 else:
-                    self._receiverTryTerminate(session, content_name, content_data, profile=profile)
+                    self._receiverTryTerminate(client, session, content_name, content_data)
             else:
                 raise NotImplementedError
 
-    def _sendCheckSum(self, session, content_name, content_data, profile):
+    def _sendCheckSum(self, client, session, content_name, content_data):
         """Send the session-info with the hash checksum"""
         file_data = content_data['application_data']['file_data']
         hasher = file_data['hash_hasher']
         hash_ = hasher.hexdigest()
         log.debug(u"Calculated hash: {}".format(hash_))
-        iq_elt, jingle_elt = self._j.buildSessionInfo(session, profile)
+        iq_elt, jingle_elt = self._j.buildSessionInfo(client, session)
         checksum_elt = jingle_elt.addElement((NS_JINGLE_FT, 'checksum'))
         checksum_elt['creator'] = content_data['creator']
         checksum_elt['name'] = content_name
@@ -258,7 +260,7 @@
         file_elt.addChild(self._hash.buildHashElt(hash_))
         iq_elt.send()
 
-    def _receiverTryTerminate(self, session, content_name, content_data, last_try=False, profile=C.PROF_KEY_NONE):
+    def _receiverTryTerminate(self, client, session, content_name, content_data, last_try=False):
         """Try to terminate the session
 
         This method must only be used by the receiver.
@@ -274,8 +276,8 @@
         if hash_given is None:
             if last_try:
                 log.warning(u"sender didn't sent hash checksum, we can't check the file")
-                self._j.delayedContentTerminate(session, content_name, profile=profile)
-                content_data['file_obj'].close()
+                self._j.delayedContentTerminate(client, session, content_name)
+                content_data['stream_object'].close()
                 return True
             return False
         hasher = file_data['hash_hasher']
@@ -296,8 +298,8 @@
                 given = hash_given,
                 our = hash_)
 
-        self._j.delayedContentTerminate(session, content_name, profile=profile)
-        content_data['file_obj'].close(progress_metadata, error)
+        self._j.delayedContentTerminate(client, session, content_name)
+        content_data['stream_object'].close(progress_metadata, error)
         # we may have the last_try timer still active, so we try to cancel it
         try:
             content_data['last_try_timer'].cancel()
@@ -305,25 +307,25 @@
             pass
         return True
 
-    def _finishedCb(self, dummy, session, content_name, content_data, profile):
+    def _finishedCb(self, dummy, client, session, content_name, content_data):
         log.info(u"File transfer terminated")
         if content_data['senders'] != session['role']:
             # we terminate the session only if we are the receiver,
             # as recommanded in XEP-0234 ยง2 (after example 6)
             content_data['transfer_finished'] = True
-            if not self._receiverTryTerminate(session, content_name, content_data, profile=profile):
+            if not self._receiverTryTerminate(client, session, content_name, content_data):
                 # we have not received the hash yet, we wait 5 more seconds
                 content_data['last_try_timer'] = reactor.callLater(
-                    5, self._receiverTryTerminate, session, content_name, content_data, last_try=True, profile=profile)
+                    5, self._receiverTryTerminate, client, session, content_name, content_data, last_try=True)
         else:
             # we are the sender, we send the checksum
-            self._sendCheckSum(session, content_name, content_data, profile)
-            content_data['file_obj'].close()
+            self._sendCheckSum(client, session, content_name, content_data)
+            content_data['stream_object'].close()
 
-    def _finishedEb(self, failure, session, content_name, content_data, profile):
+    def _finishedEb(self, failure, client, session, content_name, content_data):
         log.warning(u"Error while streaming file: {}".format(failure))
-        content_data['file_obj'].close()
-        self._j.contentTerminate(session, content_name, reason=self._j.REASON_FAILED_TRANSPORT, profile=profile)
+        content_data['stream_object'].close()
+        self._j.contentTerminate(client, session, content_name, reason=self._j.REASON_FAILED_TRANSPORT)
 
 
 class XEP_0234_handler(XMPPHandler):