Mercurial > libervia-backend
diff libervia/backend/tools/stream.py @ 4071:4b842c1fb686
refactoring: renamed `sat` package to `libervia.backend`
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 02 Jun 2023 11:49:51 +0200 |
parents | sat/tools/stream.py@524856bd7b19 |
children | 040095a5dc7f |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/libervia/backend/tools/stream.py Fri Jun 02 11:49:51 2023 +0200 @@ -0,0 +1,262 @@ +#!/usr/bin/env python3 + +# Libervia: an XMPP client +# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +""" interfaces """ + +from argparse import OPTIONAL +from pathlib import Path +from typing import Callable, Optional, Union +import uuid +import os +from zope import interface +from libervia.backend.core import exceptions +from libervia.backend.core.constants import Const as C +from libervia.backend.core.core_types import SatXMPPEntity +from libervia.backend.core.log import getLogger +from twisted.protocols import basic +from twisted.internet import interfaces + +from libervia.backend.core.sat_main import SAT + +log = getLogger(__name__) + + +class IStreamProducer(interface.Interface): + def start_stream(consumer): + """start producing the stream + + @return (D): deferred fired when stream is finished + """ + pass + + +class SatFile: + """A file-like object to have high level files manipulation""" + + # TODO: manage "with" statement + + def __init__( + self, + host: SAT, + client: SatXMPPEntity, + path: Union[str, Path], + mode: str = "rb", + uid: Optional[str] = None, + size: Optional[int] = None, + data_cb: Optional[Callable] = None, + auto_end_signals: bool = True, + check_size_with_read: bool = False, + pre_close_cb: Optional[Callable]=None + ) -> None: + """ + @param host: %(doc_host)s + @param path(Path, str): path to the file to get or write to + @param mode(str): same as for built-in "open" function + @param uid(unicode, None): unique id identifing this progressing element + This uid will be used with self.host.progress_get + will be automaticaly generated if None + @param size(None, int): size of the file (when known in advance) + @param data_cb(None, callable): method to call on each data read/write + can be used to do processing like calculating hash. + if data_cb return a non None value, it will be used instead of the + data read/to write + @param auto_end_signals(bool): if True, progress_finished and progress_error signals + are automatically sent. + if False, you'll have to call self.progress_finished and self.progress_error + yourself. + progress_started signal is always sent automatically + @param check_size_with_read(bool): if True, size will be checked using number of + bytes read or written. This is useful when data_cb modifiy len of file. + @param pre_close_cb: + """ + self.host = host + self.profile = client.profile + self.uid = uid or str(uuid.uuid4()) + self._file = open(path, mode) + self.size = size + self.data_cb = data_cb + self.auto_end_signals = auto_end_signals + self.pre_close_cb = pre_close_cb + metadata = self.get_progress_metadata() + self.host.register_progress_cb( + self.uid, self.get_progress, metadata, profile=client.profile + ) + self.host.bridge.progress_started(self.uid, metadata, client.profile) + + self._transfer_count = 0 if check_size_with_read else None + + @property + def check_size_with_read(self): + return self._transfer_count is not None + + @check_size_with_read.setter + def check_size_with_read(self, value): + if value and self._transfer_count is None: + self._transfer_count = 0 + else: + self._transfer_count = None + + def check_size(self): + """Check that current size correspond to given size + + must be used when the transfer is supposed to be finished + @return (bool): True if the position is the same as given size + @raise exceptions.NotFound: size has not be specified + """ + if self.check_size_with_read: + position = self._transfer_count + else: + position = self._file.tell() + if self.size is None: + raise exceptions.NotFound + return position == self.size + + def close(self, progress_metadata=None, error=None): + """Close the current file + + @param progress_metadata(None, dict): metadata to send with _onProgressFinished + message + @param error(None, unicode): set to an error message if progress was not + successful + mutually exclusive with progress_metadata + error can happen even if error is None, if current size differ from given size + """ + if self._file.closed: + return # avoid double close (which is allowed) error + if self.pre_close_cb is not None: + self.pre_close_cb() + if error is None: + try: + size_ok = self.check_size() + except exceptions.NotFound: + size_ok = True + if not size_ok: + error = "declared and actual size mismatch" + log.warning(error) + progress_metadata = None + + self._file.close() + + if self.auto_end_signals: + if error is None: + self.progress_finished(progress_metadata) + else: + assert progress_metadata is None + self.progress_error(error) + + self.host.remove_progress_cb(self.uid, self.profile) + if error is not None: + log.error(f"file {self._file} closed with an error: {error}") + + @property + def closed(self): + return self._file.closed + + def progress_finished(self, metadata=None): + if metadata is None: + metadata = {} + self.host.bridge.progress_finished(self.uid, metadata, self.profile) + + def progress_error(self, error): + self.host.bridge.progress_error(self.uid, error, self.profile) + + def flush(self): + self._file.flush() + + def write(self, buf): + if self.data_cb is not None: + ret = self.data_cb(buf) + if ret is not None: + buf = ret + if self._transfer_count is not None: + self._transfer_count += len(buf) + self._file.write(buf) + + def read(self, size=-1): + read = self._file.read(size) + if self.data_cb is not None: + ret = self.data_cb(read) + if ret is not None: + read = ret + if self._transfer_count is not None: + self._transfer_count += len(read) + return read + + def seek(self, offset, whence=os.SEEK_SET): + self._file.seek(offset, whence) + + def tell(self): + return self._file.tell() + + @property + def mode(self): + return self._file.mode + + def get_progress_metadata(self): + """Return progression metadata as given to progress_started + + @return (dict): metadata (check bridge for documentation) + """ + metadata = {"type": C.META_TYPE_FILE} + + mode = self._file.mode + if "+" in mode: + pass # we have no direction in read/write modes + elif mode in ("r", "rb"): + metadata["direction"] = "out" + elif mode in ("w", "wb"): + metadata["direction"] = "in" + elif "U" in mode: + metadata["direction"] = "out" + else: + raise exceptions.InternalError + + metadata["name"] = self._file.name + + return metadata + + def get_progress(self, progress_id, profile): + ret = {"position": self._file.tell()} + if self.size: + ret["size"] = self.size + return ret + + +@interface.implementer(IStreamProducer) +@interface.implementer(interfaces.IConsumer) +class FileStreamObject(basic.FileSender): + def __init__(self, host, client, path, **kwargs): + """ + + A SatFile will be created and put in self.file_obj + @param path(unicode): path to the file + @param **kwargs: kw arguments to pass to SatFile + """ + self.file_obj = SatFile(host, client, path, **kwargs) + + def registerProducer(self, producer, streaming): + pass + + def start_stream(self, consumer): + return self.beginFileTransfer(self.file_obj, consumer) + + def write(self, data): + self.file_obj.write(data) + + def close(self, *args, **kwargs): + self.file_obj.close(*args, **kwargs)