diff src/plugins/plugin_misc_file.py @ 2489:e2a7bb875957

plugin pipe/stream, file transfert: refactoring and improvments: this is a big patch as things had to be changed at the same time. - changed methods using profile argument to use client instead - move SatFile in a new tools.stream module, has it should be part of core, not a plugin - new IStreamProducer interface, to handler starting a pull producer - new FileStreamObject which create a stream producer/consumer from a SatFile - plugin pipe is no more using unix named pipe, as it complicate the thing, special care need to be taken to not block, and it's generally not necessary. Instead a socket is now used, so the plugin has been renomed to jingle stream. - bad connection/error should be better handler in jingle stream plugin, and code should not block anymore - jp pipe commands have been updated accordingly fix bug 237
author Goffi <goffi@goffi.org>
date Thu, 08 Feb 2018 00:37:42 +0100
parents 0046283a285d
children 7ad5f2c4e34a
line wrap: on
line diff
--- a/src/plugins/plugin_misc_file.py	Thu Feb 01 07:24:34 2018 +0100
+++ b/src/plugins/plugin_misc_file.py	Thu Feb 08 00:37:42 2018 +0100
@@ -23,11 +23,11 @@
 log = getLogger(__name__)
 from sat.core import exceptions
 from sat.tools import xml_tools
+from sat.tools import stream
 from twisted.internet import defer
 from twisted.words.protocols.jabber import jid
 import os
 import os.path
-import uuid
 
 
 PLUGIN_INFO = {
@@ -52,143 +52,8 @@
 PROGRESS_ID_KEY = 'progress_id'
 
 
-class SatFile(object):
-    """A file-like object to have high level files manipulation"""
-    # TODO: manage "with" statement
-
-    def __init__(self, host, path, mode='rb', uid=None, size=None, data_cb=None, auto_end_signals=True, profile=C.PROF_KEY_NONE):
-        """
-        @param host: %(doc_host)s
-        @param path(str): path of the file to get
-        @param mode(str): same as for built-in "open" function
-        @param uid(unicode, None): unique id identifing this progressing element
-            This uid will be used with self.host.progressGet
-            will be automaticaly generated if None
-        @param size(None, int): size of the file
-        @param data_cb(None, callable): method to call on each data read/write
-            mainly useful to do things like calculating hash
-        @param auto_end_signals(bool): if True, progressFinished and progressError signals are automatically sent
-            if False, you'll have to call self.progressFinished and self.progressError yourself
-            progressStarted signal is always sent automatically
-        """
-        self.host = host
-        self.uid = uid or unicode(uuid.uuid4())
-        self._file = open(path, mode)
-        self.size = size
-        self.data_cb = data_cb
-        self.profile = profile
-        self.auto_end_signals = auto_end_signals
-        metadata = self.getProgressMetadata()
-        self.host.registerProgressCb(self.uid, self.getProgress, metadata, profile=profile)
-        self.host.bridge.progressStarted(self.uid, metadata, self.profile)
-
-    def checkSize(self):
-        """Check that current size correspond to given size
-
-        must be used when the transfer is supposed to be finished
-        @return (bool): True if the position is the same as given size
-        @raise exceptions.NotFound: size has not be specified
-        """
-        position = self._file.tell()
-        if self.size is None:
-            raise exceptions.NotFound
-        return position == self.size
-
-
-    def close(self, progress_metadata=None, error=None):
-        """Close the current file
-
-        @param progress_metadata(None, dict): metadata to send with _onProgressFinished message
-        @param error(None, unicode): set to an error message if progress was not successful
-            mutually exclusive with progress_metadata
-            error can happen even if error is None, if current size differ from given size
-        """
-        if self._file.closed:
-            return # avoid double close (which is allowed) error
-        if error is None:
-            try:
-                size_ok = self.checkSize()
-            except exceptions.NotFound:
-                size_ok = True
-            if not size_ok:
-                error = u'declared and actual size mismatch'
-                log.warning(error)
-                progress_metadata = None
-
-        self._file.close()
-
-        if self.auto_end_signals:
-            if error is None:
-                self.progressFinished(progress_metadata)
-            else:
-                assert progress_metadata is None
-                self.progressError(error)
-
-        self.host.removeProgressCb(self.uid, self.profile)
-
-    def progressFinished(self, metadata=None):
-        if metadata is None:
-            metadata = {}
-        self.host.bridge.progressFinished(self.uid, metadata, self.profile)
-
-    def progressError(self, error):
-        self.host.bridge.progressError(self.uid, error, self.profile)
-
-    def flush(self):
-        self._file.flush()
-
-    def write(self, buf):
-        self._file.write(buf)
-        if self.data_cb is not None:
-            return self.data_cb(buf)
-
-    def read(self, size=-1):
-        read = self._file.read(size)
-        if self.data_cb is not None and read:
-            self.data_cb(read)
-        return read
-
-    def seek(self, offset, whence=os.SEEK_SET):
-        self._file.seek(offset, whence)
-
-    def tell(self):
-        return self._file.tell()
-
-    def mode(self):
-        return self._file.mode()
-
-    def getProgressMetadata(self):
-        """Return progression metadata as given to progressStarted
-
-        @return (dict): metadata (check bridge for documentation)
-        """
-        metadata = {'type': C.META_TYPE_FILE}
-
-        mode = self._file.mode
-        if '+' in mode:
-            pass # we have no direction in read/write modes
-        elif mode in ('r', 'rb'):
-            metadata['direction'] = 'out'
-        elif mode in ('w', 'wb'):
-            metadata['direction'] = 'in'
-        elif 'U' in mode:
-            metadata['direction'] = 'out'
-        else:
-            raise exceptions.InternalError
-
-        metadata['name'] = self._file.name
-
-        return metadata
-
-    def getProgress(self, progress_id, profile):
-        ret = {'position': self._file.tell()}
-        if self.size:
-            ret['size'] = self.size
-        return ret
-
-
 class FilePlugin(object):
-    File=SatFile
+    File=stream.SatFile
 
     def __init__(self, host):
         log.info(_("plugin File initialization"))
@@ -198,10 +63,11 @@
         host.importMenu((D_("Action"), D_("send file")), self._fileSendMenu, security_limit=10, help_string=D_("Send a file"), type_=C.MENU_SINGLE)
 
     def _fileSend(self, peer_jid_s, filepath, name="", file_desc="", profile=C.PROF_KEY_NONE):
-        return self.fileSend(jid.JID(peer_jid_s), filepath, name or None, file_desc or None, profile)
+        client = self.host.getClient(profile)
+        return self.fileSend(client, jid.JID(peer_jid_s), filepath, name or None, file_desc or None)
 
     @defer.inlineCallbacks
-    def fileSend(self, peer_jid, filepath, filename=None, file_desc=None, profile=C.PROF_KEY_NONE):
+    def fileSend(self, client, peer_jid, filepath, filename=None, file_desc=None):
         """Send a file using best available method
 
         @param peer_jid(jid.JID): jid of the destinee
@@ -211,7 +77,6 @@
         @param profile: %(doc_profile)s
         @return (dict): action dictionary, with progress id in case of success, else xmlui message
         """
-        client = self.host.getClient(profile)
         if not os.path.isfile(filepath):
             raise exceptions.DataError(u"The given path doesn't link to a file")
         if not filename:
@@ -220,18 +85,18 @@
             has_feature = yield self.host.hasFeature(client, namespace, peer_jid)
             if has_feature:
                 log.info(u"{name} method will be used to send the file".format(name=method_name))
-                progress_id = yield callback(peer_jid, filepath, filename, file_desc, profile)
+                progress_id = yield callback(client, peer_jid, filepath, filename, file_desc)
                 defer.returnValue({'progress': progress_id})
         msg = u"Can't find any method to send file to {jid}".format(jid=peer_jid.full())
         log.warning(msg)
         defer.returnValue({'xmlui': xml_tools.note(u"Can't transfer file", msg, C.XMLUI_DATA_LVL_WARNING).toXml()})
 
-    def _onFileChoosed(self, peer_jid, data, profile):
+    def _onFileChoosed(self, client, peer_jid, data):
         cancelled = C.bool(data.get("cancelled", C.BOOL_FALSE))
         if cancelled:
             return
         path=data['path']
-        return self.fileSend(peer_jid, path, profile=profile)
+        return self.fileSend(client, peer_jid, path)
 
     def _fileSendMenu(self, data, profile):
         """ XMLUI activated by menu: return file sending UI
@@ -243,7 +108,7 @@
         except RuntimeError:
             raise exceptions.DataError(_("Invalid JID"))
 
-        file_choosed_id = self.host.registerCallback(lambda data, profile: self._onFileChoosed(jid_, data, profile), with_data=True, one_shot=True)
+        file_choosed_id = self.host.registerCallback(lambda data, profile: self._onFileChoosed(self.host.getClient(profile), jid_, data), with_data=True, one_shot=True)
         xml_ui = xml_tools.XMLUI(
             C.XMLUI_DIALOG,
             dialog_opt = {
@@ -278,25 +143,37 @@
     # Dialogs with user
     # the overwrite check is done here
 
-    def _openFileWrite(self, file_path, transfer_data, file_data, profile):
-        assert 'file_obj' not in transfer_data
-        transfer_data['file_obj'] = SatFile(
-            self.host,
-            file_path,
-            'wb',
-            uid=file_data[PROGRESS_ID_KEY],
-            size=file_data['size'],
-            data_cb = file_data.get('data_cb'),
-            profile=profile,
-            )
+    def _openFileWrite(self, client, file_path, transfer_data, file_data, stream_object):
+        if stream_object:
+            assert 'stream_object' not in transfer_data
+            transfer_data['stream_object'] = stream.FileStreamObject(
+                self.host,
+                client,
+                file_path,
+                mode='wb',
+                uid=file_data[PROGRESS_ID_KEY],
+                size=file_data['size'],
+                data_cb = file_data.get('data_cb'),
+                )
+        else:
+            assert 'file_obj' not in transfer_data
+            transfer_data['file_obj'] = stream.SatFile(
+                self.host,
+                client,
+                file_path,
+                mode='wb',
+                uid=file_data[PROGRESS_ID_KEY],
+                size=file_data['size'],
+                data_cb = file_data.get('data_cb'),
+                )
 
-    def _gotConfirmation(self, data, peer_jid, transfer_data, file_data, profile):
+    def _gotConfirmation(self, data, client, peer_jid, transfer_data, file_data, stream_object):
         """Called when the permission and dest path have been received
 
         @param peer_jid(jid.JID): jid of the file sender
         @param transfer_data(dict): same as for [self.getDestDir]
         @param file_data(dict): same as for [self.getDestDir]
-        @param profile: %(doc_profile)s
+        @param stream_object(bool): same as for [self.getDestDir]
         return (bool): True if copy is wanted and OK
             False if user wants to cancel
             if file exists ask confirmation and call again self._getDestDir if needed
@@ -311,10 +188,10 @@
         if os.path.exists(file_path):
             def check_overwrite(overwrite):
                 if overwrite:
-                    self._openFileWrite(file_path, transfer_data, file_data, profile)
+                    self._openFileWrite(client, file_path, transfer_data, file_data, stream_object)
                     return True
                 else:
-                    return self.getDestDir(peer_jid, transfer_data, file_data, profile)
+                    return self.getDestDir(client, peer_jid, transfer_data, file_data)
 
             exists_d = xml_tools.deferConfirm(
                 self.host,
@@ -325,14 +202,14 @@
                               'meta_progress_id': file_data[PROGRESS_ID_KEY]
                              },
                 security_limit=SECURITY_LIMIT,
-                profile=profile)
+                profile=client.profile)
             exists_d.addCallback(check_overwrite)
             return exists_d
 
-        self._openFileWrite(file_path, transfer_data, file_data, profile)
+        self._openFileWrite(client, file_path, transfer_data, file_data, stream_object)
         return True
 
-    def getDestDir(self, peer_jid, transfer_data, file_data, profile):
+    def getDestDir(self, client, peer_jid, transfer_data, file_data, stream_object=False):
         """Request confirmation and destination dir to user
 
         Overwrite confirmation is managed.
@@ -341,7 +218,7 @@
         @param filename(unicode): name of the file
         @param transfer_data(dict): data of the transfer session,
             it will be only used to store the file_obj.
-            "file_obj" key *MUST NOT* exist before using getDestDir
+            "file_obj" (or "stream_object") key *MUST NOT* exist before using getDestDir
         @param file_data(dict): information about the file to be transfered
             It MUST contain the following keys:
                 - peer_jid (jid.JID): other peer jid
@@ -355,7 +232,8 @@
                 - data_cb (callable): method called on each data read/write
             "file_path" will be added to this dict once destination selected
             "size_human" will also be added with human readable file size
-        @param profile: %(doc_profile)s
+        @param stream_object(bool): if True, a stream_object will be used instead of file_obj
+            a stream.FileStreamObject will be used
         return (defer.Deferred): True if transfer is accepted
         """
         filename = file_data['name']
@@ -373,6 +251,6 @@
                           'meta_progress_id': file_data[PROGRESS_ID_KEY]
                          },
             security_limit=SECURITY_LIMIT,
-            profile=profile)
-        d.addCallback(self._gotConfirmation, peer_jid, transfer_data, file_data, profile)
+            profile=client.profile)
+        d.addCallback(self._gotConfirmation, client, peer_jid, transfer_data, file_data, stream_object)
         return d