Mercurial > libervia-backend
diff src/tools/stream.py @ 2489:e2a7bb875957
plugin pipe/stream, file transfert: refactoring and improvments:
this is a big patch as things had to be changed at the same time.
- changed methods using profile argument to use client instead
- move SatFile in a new tools.stream module, has it should be part of core, not a plugin
- new IStreamProducer interface, to handler starting a pull producer
- new FileStreamObject which create a stream producer/consumer from a SatFile
- plugin pipe is no more using unix named pipe, as it complicate the thing,
special care need to be taken to not block, and it's generally not necessary.
Instead a socket is now used, so the plugin has been renomed to jingle stream.
- bad connection/error should be better handler in jingle stream plugin, and code should not block anymore
- jp pipe commands have been updated accordingly
fix bug 237
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 08 Feb 2018 00:37:42 +0100 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/tools/stream.py Thu Feb 08 00:37:42 2018 +0100 @@ -0,0 +1,200 @@ +#!/usr/bin/env python2 +# -*- coding: utf-8 -*- + +# SAT: a jabber client +# Copyright (C) 2009-2018 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +""" interfaces """ + +from sat.core import exceptions +from sat.core.constants import Const as C +from sat.core.log import getLogger +from twisted.protocols import basic +from twisted.internet import interfaces +from zope import interface +import uuid +import os + +log = getLogger(__name__) + + +class IStreamProducer(interface.Interface): + + def startStream(consumer): + """start producing the stream + + @return (D): deferred fired when stream is finished + """ + + +class SatFile(object): + """A file-like object to have high level files manipulation""" + # TODO: manage "with" statement + + def __init__(self, host, client, path, mode='rb', uid=None, size=None, data_cb=None, auto_end_signals=True): + """ + @param host: %(doc_host)s + @param path(str): path of the file to get + @param mode(str): same as for built-in "open" function + @param uid(unicode, None): unique id identifing this progressing element + This uid will be used with self.host.progressGet + will be automaticaly generated if None + @param size(None, int): size of the file (when known in advance) + @param data_cb(None, callable): method to call on each data read/write + mainly useful to do things like calculating hash + @param auto_end_signals(bool): if True, progressFinished and progressError signals are automatically sent + if False, you'll have to call self.progressFinished and self.progressError yourself + progressStarted signal is always sent automatically + """ + self.host = host + self.profile = client.profile + self.uid = uid or unicode(uuid.uuid4()) + self._file = open(path, mode) + self.size = size + self.data_cb = data_cb + self.auto_end_signals = auto_end_signals + metadata = self.getProgressMetadata() + self.host.registerProgressCb(self.uid, self.getProgress, metadata, profile=client.profile) + self.host.bridge.progressStarted(self.uid, metadata, client.profile) + + def checkSize(self): + """Check that current size correspond to given size + + must be used when the transfer is supposed to be finished + @return (bool): True if the position is the same as given size + @raise exceptions.NotFound: size has not be specified + """ + position = self._file.tell() + if self.size is None: + raise exceptions.NotFound + return position == self.size + + def close(self, progress_metadata=None, error=None): + """Close the current file + + @param progress_metadata(None, dict): metadata to send with _onProgressFinished message + @param error(None, unicode): set to an error message if progress was not successful + mutually exclusive with progress_metadata + error can happen even if error is None, if current size differ from given size + """ + if self._file.closed: + return # avoid double close (which is allowed) error + if error is None: + try: + size_ok = self.checkSize() + except exceptions.NotFound: + size_ok = True + if not size_ok: + error = u'declared and actual size mismatch' + log.warning(error) + progress_metadata = None + + self._file.close() + + if self.auto_end_signals: + if error is None: + self.progressFinished(progress_metadata) + else: + assert progress_metadata is None + self.progressError(error) + + self.host.removeProgressCb(self.uid, self.profile) + + def progressFinished(self, metadata=None): + if metadata is None: + metadata = {} + self.host.bridge.progressFinished(self.uid, metadata, self.profile) + + def progressError(self, error): + self.host.bridge.progressError(self.uid, error, self.profile) + + def flush(self): + self._file.flush() + + def write(self, buf): + self._file.write(buf) + if self.data_cb is not None: + return self.data_cb(buf) + + def read(self, size=-1): + read = self._file.read(size) + if self.data_cb is not None and read: + self.data_cb(read) + return read + + def seek(self, offset, whence=os.SEEK_SET): + self._file.seek(offset, whence) + + def tell(self): + return self._file.tell() + + def mode(self): + return self._file.mode() + + def getProgressMetadata(self): + """Return progression metadata as given to progressStarted + + @return (dict): metadata (check bridge for documentation) + """ + metadata = {'type': C.META_TYPE_FILE} + + mode = self._file.mode + if '+' in mode: + pass # we have no direction in read/write modes + elif mode in ('r', 'rb'): + metadata['direction'] = 'out' + elif mode in ('w', 'wb'): + metadata['direction'] = 'in' + elif 'U' in mode: + metadata['direction'] = 'out' + else: + raise exceptions.InternalError + + metadata['name'] = self._file.name + + return metadata + + def getProgress(self, progress_id, profile): + ret = {'position': self._file.tell()} + if self.size: + ret['size'] = self.size + return ret + + +@interface.implementer(IStreamProducer) +@interface.implementer(interfaces.IConsumer) +class FileStreamObject(basic.FileSender): + + def __init__(self, host, client, path, **kwargs): + """ + + A SatFile will be created and put in self.file_obj + @param path(unicode): path to the file + @param **kwargs: kw arguments to pass to SatFile + """ + self.file_obj = SatFile(host, client, path, **kwargs) + + def registerProducer(self, producer, streaming): + pass + + def startStream(self, consumer): + return self.beginFileTransfer(self.file_obj, consumer) + + def write(self, data): + self.file_obj.write(data) + + def close(self, *args, **kwargs): + self.file_obj.close(*args, **kwargs)