diff libervia/backend/tools/stream.py @ 4071:4b842c1fb686

refactoring: renamed `sat` package to `libervia.backend`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 11:49:51 +0200
parents sat/tools/stream.py@524856bd7b19
children 040095a5dc7f
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/libervia/backend/tools/stream.py	Fri Jun 02 11:49:51 2023 +0200
@@ -0,0 +1,262 @@
+#!/usr/bin/env python3
+
+# Libervia: an XMPP client
+# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+""" interfaces """
+
+from argparse import OPTIONAL
+from pathlib import Path
+from typing import Callable, Optional, Union
+import uuid
+import os
+from zope import interface
+from libervia.backend.core import exceptions
+from libervia.backend.core.constants import Const as C
+from libervia.backend.core.core_types import SatXMPPEntity
+from libervia.backend.core.log import getLogger
+from twisted.protocols import basic
+from twisted.internet import interfaces
+
+from libervia.backend.core.sat_main import SAT
+
+log = getLogger(__name__)
+
+
+class IStreamProducer(interface.Interface):
+    def start_stream(consumer):
+        """start producing the stream
+
+        @return (D): deferred fired when stream is finished
+        """
+        pass
+
+
+class SatFile:
+    """A file-like object to have high level files manipulation"""
+
+    # TODO: manage "with" statement
+
+    def __init__(
+        self,
+        host: SAT,
+        client: SatXMPPEntity,
+        path: Union[str, Path],
+        mode: str = "rb",
+        uid: Optional[str] = None,
+        size: Optional[int] = None,
+        data_cb: Optional[Callable] = None,
+        auto_end_signals: bool = True,
+        check_size_with_read: bool = False,
+        pre_close_cb: Optional[Callable]=None
+    ) -> None:
+        """
+        @param host: %(doc_host)s
+        @param path(Path, str): path to the file to get or write to
+        @param mode(str): same as for built-in "open" function
+        @param uid(unicode, None): unique id identifing this progressing element
+            This uid will be used with self.host.progress_get
+            will be automaticaly generated if None
+        @param size(None, int): size of the file (when known in advance)
+        @param data_cb(None, callable): method to call on each data read/write
+            can be used to do processing like calculating hash.
+            if data_cb return a non None value, it will be used instead of the
+            data read/to write
+        @param auto_end_signals(bool): if True, progress_finished and progress_error signals
+            are automatically sent.
+            if False, you'll have to call self.progress_finished and self.progress_error
+            yourself.
+            progress_started signal is always sent automatically
+        @param check_size_with_read(bool): if True, size will be checked using number of
+            bytes read or written. This is useful when data_cb modifiy len of file.
+        @param pre_close_cb:
+        """
+        self.host = host
+        self.profile = client.profile
+        self.uid = uid or str(uuid.uuid4())
+        self._file = open(path, mode)
+        self.size = size
+        self.data_cb = data_cb
+        self.auto_end_signals = auto_end_signals
+        self.pre_close_cb = pre_close_cb
+        metadata = self.get_progress_metadata()
+        self.host.register_progress_cb(
+            self.uid, self.get_progress, metadata, profile=client.profile
+        )
+        self.host.bridge.progress_started(self.uid, metadata, client.profile)
+
+        self._transfer_count = 0 if check_size_with_read else None
+
+    @property
+    def check_size_with_read(self):
+        return self._transfer_count is not None
+
+    @check_size_with_read.setter
+    def check_size_with_read(self, value):
+        if value and self._transfer_count is None:
+            self._transfer_count = 0
+        else:
+            self._transfer_count = None
+
+    def check_size(self):
+        """Check that current size correspond to given size
+
+        must be used when the transfer is supposed to be finished
+        @return (bool): True if the position is the same as given size
+        @raise exceptions.NotFound: size has not be specified
+        """
+        if self.check_size_with_read:
+            position = self._transfer_count
+        else:
+            position = self._file.tell()
+        if self.size is None:
+            raise exceptions.NotFound
+        return position == self.size
+
+    def close(self, progress_metadata=None, error=None):
+        """Close the current file
+
+        @param progress_metadata(None, dict): metadata to send with _onProgressFinished
+            message
+        @param error(None, unicode): set to an error message if progress was not
+            successful
+            mutually exclusive with progress_metadata
+            error can happen even if error is None, if current size differ from given size
+        """
+        if self._file.closed:
+            return  # avoid double close (which is allowed) error
+        if self.pre_close_cb is not None:
+            self.pre_close_cb()
+        if error is None:
+            try:
+                size_ok = self.check_size()
+            except exceptions.NotFound:
+                size_ok = True
+            if not size_ok:
+                error = "declared and actual size mismatch"
+                log.warning(error)
+                progress_metadata = None
+
+        self._file.close()
+
+        if self.auto_end_signals:
+            if error is None:
+                self.progress_finished(progress_metadata)
+            else:
+                assert progress_metadata is None
+                self.progress_error(error)
+
+        self.host.remove_progress_cb(self.uid, self.profile)
+        if error is not None:
+            log.error(f"file {self._file} closed with an error: {error}")
+
+    @property
+    def closed(self):
+        return self._file.closed
+
+    def progress_finished(self, metadata=None):
+        if metadata is None:
+            metadata = {}
+        self.host.bridge.progress_finished(self.uid, metadata, self.profile)
+
+    def progress_error(self, error):
+        self.host.bridge.progress_error(self.uid, error, self.profile)
+
+    def flush(self):
+        self._file.flush()
+
+    def write(self, buf):
+        if self.data_cb is not None:
+            ret = self.data_cb(buf)
+            if ret is not None:
+                buf = ret
+        if self._transfer_count is not None:
+            self._transfer_count += len(buf)
+        self._file.write(buf)
+
+    def read(self, size=-1):
+        read = self._file.read(size)
+        if self.data_cb is not None:
+            ret = self.data_cb(read)
+            if ret is not None:
+                read = ret
+        if self._transfer_count is not None:
+            self._transfer_count += len(read)
+        return read
+
+    def seek(self, offset, whence=os.SEEK_SET):
+        self._file.seek(offset, whence)
+
+    def tell(self):
+        return self._file.tell()
+
+    @property
+    def mode(self):
+        return self._file.mode
+
+    def get_progress_metadata(self):
+        """Return progression metadata as given to progress_started
+
+        @return (dict): metadata (check bridge for documentation)
+        """
+        metadata = {"type": C.META_TYPE_FILE}
+
+        mode = self._file.mode
+        if "+" in mode:
+            pass  # we have no direction in read/write modes
+        elif mode in ("r", "rb"):
+            metadata["direction"] = "out"
+        elif mode in ("w", "wb"):
+            metadata["direction"] = "in"
+        elif "U" in mode:
+            metadata["direction"] = "out"
+        else:
+            raise exceptions.InternalError
+
+        metadata["name"] = self._file.name
+
+        return metadata
+
+    def get_progress(self, progress_id, profile):
+        ret = {"position": self._file.tell()}
+        if self.size:
+            ret["size"] = self.size
+        return ret
+
+
+@interface.implementer(IStreamProducer)
+@interface.implementer(interfaces.IConsumer)
+class FileStreamObject(basic.FileSender):
+    def __init__(self, host, client, path, **kwargs):
+        """
+
+        A SatFile will be created and put in self.file_obj
+        @param path(unicode): path to the file
+        @param **kwargs: kw arguments to pass to SatFile
+        """
+        self.file_obj = SatFile(host, client, path, **kwargs)
+
+    def registerProducer(self, producer, streaming):
+        pass
+
+    def start_stream(self, consumer):
+        return self.beginFileTransfer(self.file_obj, consumer)
+
+    def write(self, data):
+        self.file_obj.write(data)
+
+    def close(self, *args, **kwargs):
+        self.file_obj.close(*args, **kwargs)