comparison libervia/backend/tools/stream.py @ 4071:4b842c1fb686

refactoring: renamed `sat` package to `libervia.backend`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 11:49:51 +0200
parents sat/tools/stream.py@524856bd7b19
children 040095a5dc7f
comparison
equal deleted inserted replaced
4070:d10748475025 4071:4b842c1fb686
1 #!/usr/bin/env python3
2
3 # Libervia: an XMPP client
4 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
5
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU Affero General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
10
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU Affero General Public License for more details.
15
16 # You should have received a copy of the GNU Affero General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19 """ interfaces """
20
21 from argparse import OPTIONAL
22 from pathlib import Path
23 from typing import Callable, Optional, Union
24 import uuid
25 import os
26 from zope import interface
27 from libervia.backend.core import exceptions
28 from libervia.backend.core.constants import Const as C
29 from libervia.backend.core.core_types import SatXMPPEntity
30 from libervia.backend.core.log import getLogger
31 from twisted.protocols import basic
32 from twisted.internet import interfaces
33
34 from libervia.backend.core.sat_main import SAT
35
36 log = getLogger(__name__)
37
38
39 class IStreamProducer(interface.Interface):
40 def start_stream(consumer):
41 """start producing the stream
42
43 @return (D): deferred fired when stream is finished
44 """
45 pass
46
47
48 class SatFile:
49 """A file-like object to have high level files manipulation"""
50
51 # TODO: manage "with" statement
52
53 def __init__(
54 self,
55 host: SAT,
56 client: SatXMPPEntity,
57 path: Union[str, Path],
58 mode: str = "rb",
59 uid: Optional[str] = None,
60 size: Optional[int] = None,
61 data_cb: Optional[Callable] = None,
62 auto_end_signals: bool = True,
63 check_size_with_read: bool = False,
64 pre_close_cb: Optional[Callable]=None
65 ) -> None:
66 """
67 @param host: %(doc_host)s
68 @param path(Path, str): path to the file to get or write to
69 @param mode(str): same as for built-in "open" function
70 @param uid(unicode, None): unique id identifing this progressing element
71 This uid will be used with self.host.progress_get
72 will be automaticaly generated if None
73 @param size(None, int): size of the file (when known in advance)
74 @param data_cb(None, callable): method to call on each data read/write
75 can be used to do processing like calculating hash.
76 if data_cb return a non None value, it will be used instead of the
77 data read/to write
78 @param auto_end_signals(bool): if True, progress_finished and progress_error signals
79 are automatically sent.
80 if False, you'll have to call self.progress_finished and self.progress_error
81 yourself.
82 progress_started signal is always sent automatically
83 @param check_size_with_read(bool): if True, size will be checked using number of
84 bytes read or written. This is useful when data_cb modifiy len of file.
85 @param pre_close_cb:
86 """
87 self.host = host
88 self.profile = client.profile
89 self.uid = uid or str(uuid.uuid4())
90 self._file = open(path, mode)
91 self.size = size
92 self.data_cb = data_cb
93 self.auto_end_signals = auto_end_signals
94 self.pre_close_cb = pre_close_cb
95 metadata = self.get_progress_metadata()
96 self.host.register_progress_cb(
97 self.uid, self.get_progress, metadata, profile=client.profile
98 )
99 self.host.bridge.progress_started(self.uid, metadata, client.profile)
100
101 self._transfer_count = 0 if check_size_with_read else None
102
103 @property
104 def check_size_with_read(self):
105 return self._transfer_count is not None
106
107 @check_size_with_read.setter
108 def check_size_with_read(self, value):
109 if value and self._transfer_count is None:
110 self._transfer_count = 0
111 else:
112 self._transfer_count = None
113
114 def check_size(self):
115 """Check that current size correspond to given size
116
117 must be used when the transfer is supposed to be finished
118 @return (bool): True if the position is the same as given size
119 @raise exceptions.NotFound: size has not be specified
120 """
121 if self.check_size_with_read:
122 position = self._transfer_count
123 else:
124 position = self._file.tell()
125 if self.size is None:
126 raise exceptions.NotFound
127 return position == self.size
128
129 def close(self, progress_metadata=None, error=None):
130 """Close the current file
131
132 @param progress_metadata(None, dict): metadata to send with _onProgressFinished
133 message
134 @param error(None, unicode): set to an error message if progress was not
135 successful
136 mutually exclusive with progress_metadata
137 error can happen even if error is None, if current size differ from given size
138 """
139 if self._file.closed:
140 return # avoid double close (which is allowed) error
141 if self.pre_close_cb is not None:
142 self.pre_close_cb()
143 if error is None:
144 try:
145 size_ok = self.check_size()
146 except exceptions.NotFound:
147 size_ok = True
148 if not size_ok:
149 error = "declared and actual size mismatch"
150 log.warning(error)
151 progress_metadata = None
152
153 self._file.close()
154
155 if self.auto_end_signals:
156 if error is None:
157 self.progress_finished(progress_metadata)
158 else:
159 assert progress_metadata is None
160 self.progress_error(error)
161
162 self.host.remove_progress_cb(self.uid, self.profile)
163 if error is not None:
164 log.error(f"file {self._file} closed with an error: {error}")
165
166 @property
167 def closed(self):
168 return self._file.closed
169
170 def progress_finished(self, metadata=None):
171 if metadata is None:
172 metadata = {}
173 self.host.bridge.progress_finished(self.uid, metadata, self.profile)
174
175 def progress_error(self, error):
176 self.host.bridge.progress_error(self.uid, error, self.profile)
177
178 def flush(self):
179 self._file.flush()
180
181 def write(self, buf):
182 if self.data_cb is not None:
183 ret = self.data_cb(buf)
184 if ret is not None:
185 buf = ret
186 if self._transfer_count is not None:
187 self._transfer_count += len(buf)
188 self._file.write(buf)
189
190 def read(self, size=-1):
191 read = self._file.read(size)
192 if self.data_cb is not None:
193 ret = self.data_cb(read)
194 if ret is not None:
195 read = ret
196 if self._transfer_count is not None:
197 self._transfer_count += len(read)
198 return read
199
200 def seek(self, offset, whence=os.SEEK_SET):
201 self._file.seek(offset, whence)
202
203 def tell(self):
204 return self._file.tell()
205
206 @property
207 def mode(self):
208 return self._file.mode
209
210 def get_progress_metadata(self):
211 """Return progression metadata as given to progress_started
212
213 @return (dict): metadata (check bridge for documentation)
214 """
215 metadata = {"type": C.META_TYPE_FILE}
216
217 mode = self._file.mode
218 if "+" in mode:
219 pass # we have no direction in read/write modes
220 elif mode in ("r", "rb"):
221 metadata["direction"] = "out"
222 elif mode in ("w", "wb"):
223 metadata["direction"] = "in"
224 elif "U" in mode:
225 metadata["direction"] = "out"
226 else:
227 raise exceptions.InternalError
228
229 metadata["name"] = self._file.name
230
231 return metadata
232
233 def get_progress(self, progress_id, profile):
234 ret = {"position": self._file.tell()}
235 if self.size:
236 ret["size"] = self.size
237 return ret
238
239
240 @interface.implementer(IStreamProducer)
241 @interface.implementer(interfaces.IConsumer)
242 class FileStreamObject(basic.FileSender):
243 def __init__(self, host, client, path, **kwargs):
244 """
245
246 A SatFile will be created and put in self.file_obj
247 @param path(unicode): path to the file
248 @param **kwargs: kw arguments to pass to SatFile
249 """
250 self.file_obj = SatFile(host, client, path, **kwargs)
251
252 def registerProducer(self, producer, streaming):
253 pass
254
255 def start_stream(self, consumer):
256 return self.beginFileTransfer(self.file_obj, consumer)
257
258 def write(self, data):
259 self.file_obj.write(data)
260
261 def close(self, *args, **kwargs):
262 self.file_obj.close(*args, **kwargs)