Mercurial > libervia-backend
diff src/plugins/plugin_misc_file.py @ 2489:e2a7bb875957
plugin pipe/stream, file transfert: refactoring and improvments:
this is a big patch as things had to be changed at the same time.
- changed methods using profile argument to use client instead
- move SatFile in a new tools.stream module, has it should be part of core, not a plugin
- new IStreamProducer interface, to handler starting a pull producer
- new FileStreamObject which create a stream producer/consumer from a SatFile
- plugin pipe is no more using unix named pipe, as it complicate the thing,
special care need to be taken to not block, and it's generally not necessary.
Instead a socket is now used, so the plugin has been renomed to jingle stream.
- bad connection/error should be better handler in jingle stream plugin, and code should not block anymore
- jp pipe commands have been updated accordingly
fix bug 237
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 08 Feb 2018 00:37:42 +0100 |
parents | 0046283a285d |
children | 7ad5f2c4e34a |
line wrap: on
line diff
--- a/src/plugins/plugin_misc_file.py Thu Feb 01 07:24:34 2018 +0100 +++ b/src/plugins/plugin_misc_file.py Thu Feb 08 00:37:42 2018 +0100 @@ -23,11 +23,11 @@ log = getLogger(__name__) from sat.core import exceptions from sat.tools import xml_tools +from sat.tools import stream from twisted.internet import defer from twisted.words.protocols.jabber import jid import os import os.path -import uuid PLUGIN_INFO = { @@ -52,143 +52,8 @@ PROGRESS_ID_KEY = 'progress_id' -class SatFile(object): - """A file-like object to have high level files manipulation""" - # TODO: manage "with" statement - - def __init__(self, host, path, mode='rb', uid=None, size=None, data_cb=None, auto_end_signals=True, profile=C.PROF_KEY_NONE): - """ - @param host: %(doc_host)s - @param path(str): path of the file to get - @param mode(str): same as for built-in "open" function - @param uid(unicode, None): unique id identifing this progressing element - This uid will be used with self.host.progressGet - will be automaticaly generated if None - @param size(None, int): size of the file - @param data_cb(None, callable): method to call on each data read/write - mainly useful to do things like calculating hash - @param auto_end_signals(bool): if True, progressFinished and progressError signals are automatically sent - if False, you'll have to call self.progressFinished and self.progressError yourself - progressStarted signal is always sent automatically - """ - self.host = host - self.uid = uid or unicode(uuid.uuid4()) - self._file = open(path, mode) - self.size = size - self.data_cb = data_cb - self.profile = profile - self.auto_end_signals = auto_end_signals - metadata = self.getProgressMetadata() - self.host.registerProgressCb(self.uid, self.getProgress, metadata, profile=profile) - self.host.bridge.progressStarted(self.uid, metadata, self.profile) - - def checkSize(self): - """Check that current size correspond to given size - - must be used when the transfer is supposed to be finished - @return (bool): True if the position is the same as given size - @raise exceptions.NotFound: size has not be specified - """ - position = self._file.tell() - if self.size is None: - raise exceptions.NotFound - return position == self.size - - - def close(self, progress_metadata=None, error=None): - """Close the current file - - @param progress_metadata(None, dict): metadata to send with _onProgressFinished message - @param error(None, unicode): set to an error message if progress was not successful - mutually exclusive with progress_metadata - error can happen even if error is None, if current size differ from given size - """ - if self._file.closed: - return # avoid double close (which is allowed) error - if error is None: - try: - size_ok = self.checkSize() - except exceptions.NotFound: - size_ok = True - if not size_ok: - error = u'declared and actual size mismatch' - log.warning(error) - progress_metadata = None - - self._file.close() - - if self.auto_end_signals: - if error is None: - self.progressFinished(progress_metadata) - else: - assert progress_metadata is None - self.progressError(error) - - self.host.removeProgressCb(self.uid, self.profile) - - def progressFinished(self, metadata=None): - if metadata is None: - metadata = {} - self.host.bridge.progressFinished(self.uid, metadata, self.profile) - - def progressError(self, error): - self.host.bridge.progressError(self.uid, error, self.profile) - - def flush(self): - self._file.flush() - - def write(self, buf): - self._file.write(buf) - if self.data_cb is not None: - return self.data_cb(buf) - - def read(self, size=-1): - read = self._file.read(size) - if self.data_cb is not None and read: - self.data_cb(read) - return read - - def seek(self, offset, whence=os.SEEK_SET): - self._file.seek(offset, whence) - - def tell(self): - return self._file.tell() - - def mode(self): - return self._file.mode() - - def getProgressMetadata(self): - """Return progression metadata as given to progressStarted - - @return (dict): metadata (check bridge for documentation) - """ - metadata = {'type': C.META_TYPE_FILE} - - mode = self._file.mode - if '+' in mode: - pass # we have no direction in read/write modes - elif mode in ('r', 'rb'): - metadata['direction'] = 'out' - elif mode in ('w', 'wb'): - metadata['direction'] = 'in' - elif 'U' in mode: - metadata['direction'] = 'out' - else: - raise exceptions.InternalError - - metadata['name'] = self._file.name - - return metadata - - def getProgress(self, progress_id, profile): - ret = {'position': self._file.tell()} - if self.size: - ret['size'] = self.size - return ret - - class FilePlugin(object): - File=SatFile + File=stream.SatFile def __init__(self, host): log.info(_("plugin File initialization")) @@ -198,10 +63,11 @@ host.importMenu((D_("Action"), D_("send file")), self._fileSendMenu, security_limit=10, help_string=D_("Send a file"), type_=C.MENU_SINGLE) def _fileSend(self, peer_jid_s, filepath, name="", file_desc="", profile=C.PROF_KEY_NONE): - return self.fileSend(jid.JID(peer_jid_s), filepath, name or None, file_desc or None, profile) + client = self.host.getClient(profile) + return self.fileSend(client, jid.JID(peer_jid_s), filepath, name or None, file_desc or None) @defer.inlineCallbacks - def fileSend(self, peer_jid, filepath, filename=None, file_desc=None, profile=C.PROF_KEY_NONE): + def fileSend(self, client, peer_jid, filepath, filename=None, file_desc=None): """Send a file using best available method @param peer_jid(jid.JID): jid of the destinee @@ -211,7 +77,6 @@ @param profile: %(doc_profile)s @return (dict): action dictionary, with progress id in case of success, else xmlui message """ - client = self.host.getClient(profile) if not os.path.isfile(filepath): raise exceptions.DataError(u"The given path doesn't link to a file") if not filename: @@ -220,18 +85,18 @@ has_feature = yield self.host.hasFeature(client, namespace, peer_jid) if has_feature: log.info(u"{name} method will be used to send the file".format(name=method_name)) - progress_id = yield callback(peer_jid, filepath, filename, file_desc, profile) + progress_id = yield callback(client, peer_jid, filepath, filename, file_desc) defer.returnValue({'progress': progress_id}) msg = u"Can't find any method to send file to {jid}".format(jid=peer_jid.full()) log.warning(msg) defer.returnValue({'xmlui': xml_tools.note(u"Can't transfer file", msg, C.XMLUI_DATA_LVL_WARNING).toXml()}) - def _onFileChoosed(self, peer_jid, data, profile): + def _onFileChoosed(self, client, peer_jid, data): cancelled = C.bool(data.get("cancelled", C.BOOL_FALSE)) if cancelled: return path=data['path'] - return self.fileSend(peer_jid, path, profile=profile) + return self.fileSend(client, peer_jid, path) def _fileSendMenu(self, data, profile): """ XMLUI activated by menu: return file sending UI @@ -243,7 +108,7 @@ except RuntimeError: raise exceptions.DataError(_("Invalid JID")) - file_choosed_id = self.host.registerCallback(lambda data, profile: self._onFileChoosed(jid_, data, profile), with_data=True, one_shot=True) + file_choosed_id = self.host.registerCallback(lambda data, profile: self._onFileChoosed(self.host.getClient(profile), jid_, data), with_data=True, one_shot=True) xml_ui = xml_tools.XMLUI( C.XMLUI_DIALOG, dialog_opt = { @@ -278,25 +143,37 @@ # Dialogs with user # the overwrite check is done here - def _openFileWrite(self, file_path, transfer_data, file_data, profile): - assert 'file_obj' not in transfer_data - transfer_data['file_obj'] = SatFile( - self.host, - file_path, - 'wb', - uid=file_data[PROGRESS_ID_KEY], - size=file_data['size'], - data_cb = file_data.get('data_cb'), - profile=profile, - ) + def _openFileWrite(self, client, file_path, transfer_data, file_data, stream_object): + if stream_object: + assert 'stream_object' not in transfer_data + transfer_data['stream_object'] = stream.FileStreamObject( + self.host, + client, + file_path, + mode='wb', + uid=file_data[PROGRESS_ID_KEY], + size=file_data['size'], + data_cb = file_data.get('data_cb'), + ) + else: + assert 'file_obj' not in transfer_data + transfer_data['file_obj'] = stream.SatFile( + self.host, + client, + file_path, + mode='wb', + uid=file_data[PROGRESS_ID_KEY], + size=file_data['size'], + data_cb = file_data.get('data_cb'), + ) - def _gotConfirmation(self, data, peer_jid, transfer_data, file_data, profile): + def _gotConfirmation(self, data, client, peer_jid, transfer_data, file_data, stream_object): """Called when the permission and dest path have been received @param peer_jid(jid.JID): jid of the file sender @param transfer_data(dict): same as for [self.getDestDir] @param file_data(dict): same as for [self.getDestDir] - @param profile: %(doc_profile)s + @param stream_object(bool): same as for [self.getDestDir] return (bool): True if copy is wanted and OK False if user wants to cancel if file exists ask confirmation and call again self._getDestDir if needed @@ -311,10 +188,10 @@ if os.path.exists(file_path): def check_overwrite(overwrite): if overwrite: - self._openFileWrite(file_path, transfer_data, file_data, profile) + self._openFileWrite(client, file_path, transfer_data, file_data, stream_object) return True else: - return self.getDestDir(peer_jid, transfer_data, file_data, profile) + return self.getDestDir(client, peer_jid, transfer_data, file_data) exists_d = xml_tools.deferConfirm( self.host, @@ -325,14 +202,14 @@ 'meta_progress_id': file_data[PROGRESS_ID_KEY] }, security_limit=SECURITY_LIMIT, - profile=profile) + profile=client.profile) exists_d.addCallback(check_overwrite) return exists_d - self._openFileWrite(file_path, transfer_data, file_data, profile) + self._openFileWrite(client, file_path, transfer_data, file_data, stream_object) return True - def getDestDir(self, peer_jid, transfer_data, file_data, profile): + def getDestDir(self, client, peer_jid, transfer_data, file_data, stream_object=False): """Request confirmation and destination dir to user Overwrite confirmation is managed. @@ -341,7 +218,7 @@ @param filename(unicode): name of the file @param transfer_data(dict): data of the transfer session, it will be only used to store the file_obj. - "file_obj" key *MUST NOT* exist before using getDestDir + "file_obj" (or "stream_object") key *MUST NOT* exist before using getDestDir @param file_data(dict): information about the file to be transfered It MUST contain the following keys: - peer_jid (jid.JID): other peer jid @@ -355,7 +232,8 @@ - data_cb (callable): method called on each data read/write "file_path" will be added to this dict once destination selected "size_human" will also be added with human readable file size - @param profile: %(doc_profile)s + @param stream_object(bool): if True, a stream_object will be used instead of file_obj + a stream.FileStreamObject will be used return (defer.Deferred): True if transfer is accepted """ filename = file_data['name'] @@ -373,6 +251,6 @@ 'meta_progress_id': file_data[PROGRESS_ID_KEY] }, security_limit=SECURITY_LIMIT, - profile=profile) - d.addCallback(self._gotConfirmation, peer_jid, transfer_data, file_data, profile) + profile=client.profile) + d.addCallback(self._gotConfirmation, client, peer_jid, transfer_data, file_data, stream_object) return d