diff src/plugins/plugin_xep_0096.py @ 2489:e2a7bb875957

plugin pipe/stream, file transfert: refactoring and improvments: this is a big patch as things had to be changed at the same time. - changed methods using profile argument to use client instead - move SatFile in a new tools.stream module, has it should be part of core, not a plugin - new IStreamProducer interface, to handler starting a pull producer - new FileStreamObject which create a stream producer/consumer from a SatFile - plugin pipe is no more using unix named pipe, as it complicate the thing, special care need to be taken to not block, and it's generally not necessary. Instead a socket is now used, so the plugin has been renomed to jingle stream. - bad connection/error should be better handler in jingle stream plugin, and code should not block anymore - jp pipe commands have been updated accordingly fix bug 237
author Goffi <goffi@goffi.org>
date Thu, 08 Feb 2018 00:37:42 +0100
parents 0046283a285d
children 7ad5f2c4e34a
line wrap: on
line diff
--- a/src/plugins/plugin_xep_0096.py	Thu Feb 01 07:24:34 2018 +0100
+++ b/src/plugins/plugin_xep_0096.py	Thu Feb 08 00:37:42 2018 +0100
@@ -23,6 +23,7 @@
 log = getLogger(__name__)
 from sat.core import exceptions
 from sat.tools import xml_tools
+from sat.tools import stream
 from twisted.words.xish import domish
 from twisted.words.protocols.jabber import jid
 from twisted.words.protocols.jabber import error
@@ -63,16 +64,15 @@
     def unload(self):
         self._si.unregisterSIProfile(SI_PROFILE_NAME)
 
-    def _badRequest(self, iq_elt, message=None, profile=C.PROF_KEY_NONE):
+    def _badRequest(self, client, iq_elt, message=None):
         """Send a bad-request error
 
         @param iq_elt(domish.Element): initial <IQ> element of the SI request
         @param message(None, unicode): informational message to display in the logs
-        @param profile: %(doc_profile)s
         """
         if message is not None:
             log.warning(message)
-        self._si.sendError(iq_elt, 'bad-request', profile)
+        self._si.sendError(client, iq_elt, 'bad-request')
 
     def _parseRange(self, parent_elt, file_size):
         """find and parse <range/> element
@@ -107,14 +107,13 @@
 
         return range_, range_offset, range_length
 
-    def _transferRequest(self, iq_elt, si_id, si_mime_type, si_elt, profile):
+    def _transferRequest(self, client, iq_elt, si_id, si_mime_type, si_elt):
         """Called when a file transfer is requested
 
         @param iq_elt(domish.Element): initial <IQ> element of the SI request
         @param si_id(unicode): Stream Initiation session id
         @param si_mime_type("unicode"): Mime type of the file (or default "application/octet-stream" if unknown)
         @param si_elt(domish.Element): request
-        @param profile: %(doc_profile)s
         """
         log.info(_("XEP-0096 file transfer requested"))
         peer_jid = jid.JID(iq_elt['from'])
@@ -122,18 +121,18 @@
         try:
             file_elt = si_elt.elements(NS_SI_FT, "file").next()
         except StopIteration:
-            return self._badRequest(iq_elt, "No <file/> element found in SI File Transfer request", profile)
+            return self._badRequest(client, iq_elt, "No <file/> element found in SI File Transfer request")
 
         try:
             feature_elt = self.host.plugins["XEP-0020"].getFeatureElt(si_elt)
         except exceptions.NotFound:
-            return self._badRequest(iq_elt, "No <feature/> element found in SI File Transfer request", profile)
+            return self._badRequest(client, iq_elt, "No <feature/> element found in SI File Transfer request")
 
         try:
             filename = file_elt["name"]
             file_size = int(file_elt["size"])
         except (KeyError, ValueError):
-            return self._badRequest(iq_elt, "Malformed SI File Transfer request", profile)
+            return self._badRequest(client, iq_elt, "Malformed SI File Transfer request")
 
         file_date = file_elt.getAttribute("date")
         file_hash = file_elt.getAttribute("hash")
@@ -148,12 +147,12 @@
         try:
             range_, range_offset, range_length = self._parseRange(file_elt, file_size)
         except ValueError:
-            return self._badRequest(iq_elt, "Malformed SI File Transfer request", profile)
+            return self._badRequest(client, iq_elt, "Malformed SI File Transfer request")
 
         try:
             stream_method = self.host.plugins["XEP-0020"].negotiate(feature_elt, 'stream-method', self.managed_stream_m, namespace=None)
         except KeyError:
-            return self._badRequest(iq_elt, "No stream method found", profile)
+            return self._badRequest(client, iq_elt, "No stream method found")
 
         if stream_method:
             if stream_method == self.host.plugins["XEP-0065"].NAMESPACE:
@@ -164,7 +163,7 @@
                 log.error(u"Unknown stream method, this should not happen at this stage, cancelling transfer")
         else:
             log.warning(u"Can't find a valid stream method")
-            self._si.sendError(iq_elt, 'not-acceptable', profile)
+            self._si.sendError(client, iq_elt, 'not-acceptable')
             return
 
         #if we are here, the transfer can start, we just need user's agreement
@@ -172,27 +171,19 @@
                 "range": range_, "range_offset": range_offset, "range_length": range_length,
                 "si_id": si_id, "progress_id": si_id, "stream_method": stream_method, "stream_plugin": plugin}
 
-        d = self._f.getDestDir(peer_jid, data, data, profile)
-        d.addCallback(self.confirmationCb, iq_elt, data, profile)
+        d = self._f.getDestDir(client, peer_jid, data, data, stream_object=True)
+        d.addCallback(self.confirmationCb, client, iq_elt, data)
 
-    def _getFileObject(self, dest_path, can_range=False):
-        """Open file, put file pointer to the end if the file if needed
-        @param dest_path: path of the destination file
-        @param can_range: True if the file pointer can be moved
-        @return: File Object"""
-        return open(dest_path, "ab" if can_range else "wb")
-
-    def confirmationCb(self, accepted, iq_elt, data, profile):
+    def confirmationCb(self, accepted, client, iq_elt, data):
         """Called on confirmation answer
 
         @param accepted(bool): True if file transfer is accepted
         @param iq_elt(domish.Element): initial SI request
         @param data(dict): session data
-        @param profile: %(doc_profile)s
         """
         if not accepted:
             log.info(u"File transfer declined")
-            self._si.sendError(iq_elt, 'forbidden', profile)
+            self._si.sendError(client, iq_elt, 'forbidden')
             return
         # data, timeout, stream_method, failed_methods = client._xep_0096_waiting_for_approval[sid]
         # can_range = data['can_range'] == "True"
@@ -216,9 +207,9 @@
 
         # file_obj = self._getFileObject(dest_path, can_range)
         # range_offset = file_obj.tell()
-        d = data['stream_plugin'].createSession(data['file_obj'], data['peer_jid'], data['si_id'], profile=profile)
-        d.addCallback(self._transferCb, data, profile)
-        d.addErrback(self._transferEb, data, profile)
+        d = data['stream_plugin'].createSession(client, data['stream_object'], data['peer_jid'], data['si_id'])
+        d.addCallback(self._transferCb, client, data)
+        d.addErrback(self._transferEb, client, data)
 
         #we can send the iq result
         feature_elt = self.host.plugins["XEP-0020"].chooseOption({'stream-method': data['stream_method']}, namespace=None)
@@ -229,32 +220,31 @@
         #     range_elt['offset'] = str(range_offset)
         #     #TODO: manage range length
         #     misc_elts.append(range_elt)
-        self._si.acceptStream(iq_elt, feature_elt, misc_elts, profile)
+        self._si.acceptStream(client, iq_elt, feature_elt, misc_elts)
 
-    def _transferCb(self, dummy, data, profile):
+    def _transferCb(self, dummy, client, data):
         """Called by the stream method when transfer successfuly finished
 
         @param data: session data
-        @param profile: %(doc_profile)s
         """
         #TODO: check hash
-        data['file_obj'].close()
+        data['stream_object'].close()
         log.info(u'Transfer {si_id} successfuly finished'.format(**data))
 
-    def _transferEb(self, failure, data, profile):
+    def _transferEb(self, failure, client, data):
         """Called when something went wrong with the transfer
 
         @param id: stream id
         @param data: session data
-        @param profile: %(doc_profile)s
         """
         log.warning(u'Transfer {si_id} failed: {reason}'.format(reason=unicode(failure.value), **data))
-        data['file_obj'].close()
+        data['stream_object'].close()
 
     def _sendFile(self, peer_jid_s, filepath, name, desc, profile=C.PROF_KEY_NONE):
-        return self.sendFile(jid.JID(peer_jid_s), filepath, name or None, desc or None, profile)
+        client = self.host.getClient(profile)
+        return self.sendFile(client, jid.JID(peer_jid_s), filepath, name or None, desc or None)
 
-    def sendFile(self, peer_jid, filepath, name=None, desc=None, profile=C.PROF_KEY_NONE):
+    def sendFile(self, client, peer_jid, filepath, name=None, desc=None):
         """Send a file using XEP-0096
 
         @param peer_jid(jid.JID): recipient
@@ -265,7 +255,6 @@
         @param profile: %(doc_profile)s
         @return: an unique id to identify the transfer
         """
-        client = self.host.getClient(profile)
         feature_elt = self.host.plugins["XEP-0020"].proposeFeatures({'stream-method': self.managed_stream_m}, namespace=None)
 
         file_transfer_elts = []
@@ -282,7 +271,7 @@
 
         file_transfer_elts.append(domish.Element((None, 'range')))
 
-        sid, offer_d = self._si.proposeStream(peer_jid, SI_PROFILE, feature_elt, file_transfer_elts, profile=client.profile)
+        sid, offer_d = self._si.proposeStream(client, peer_jid, SI_PROFILE, feature_elt, file_transfer_elts)
         args = [filepath, sid, size, client]
         offer_d.addCallbacks(self._fileCb, self._fileEb, args, None, args)
         return sid
@@ -318,15 +307,15 @@
             log.warning(u"Invalid stream method received")
             return
 
-        file_obj = self._f.File(self.host,
-                                filepath,
-                                uid=sid,
-                                size=size,
-                                profile=client.profile
-                                )
-        d = plugin.startStream(file_obj, jid.JID(iq_elt['from']), sid, profile=client.profile)
-        d.addCallback(self._sendCb, sid, file_obj, client.profile)
-        d.addErrback(self._sendEb, sid, file_obj, client.profile)
+        stream_object = stream.FileStreamObject(self.host,
+                                                client,
+                                                filepath,
+                                                uid=sid,
+                                                size=size,
+                                                )
+        d = plugin.startStream(client, stream_object, jid.JID(iq_elt['from']), sid)
+        d.addCallback(self._sendCb, client, sid, stream_object)
+        d.addErrback(self._sendEb, client, sid, stream_object)
 
     def _fileEb(self, failure, filepath, sid, size, client):
         if failure.check(error.StanzaError):
@@ -347,16 +336,16 @@
         else:
             log.error(u'Error while proposing stream: {}'.format(failure))
 
-    def _sendCb(self, dummy, sid, file_obj, profile):
+    def _sendCb(self, dummy, client, sid, stream_object):
         log.info(_(u'transfer {sid} successfuly finished [{profile}]').format(
             sid=sid,
-            profile=profile))
-        file_obj.close()
+            profile=client.profile))
+        stream_object.close()
 
-    def _sendEb(self, failure, sid, file_obj, profile):
+    def _sendEb(self, failure, client, sid, stream_object):
         log.warning(_(u'transfer {sid} failed [{profile}]: {reason}').format(
             sid=sid,
-            profile=profile,
+            profile=client.profile,
             reason=unicode(failure.value),
             ))
-        file_obj.close()
+        stream_object.close()