Mercurial > libervia-backend
comparison libervia/backend/tools/stream.py @ 4071:4b842c1fb686
refactoring: renamed `sat` package to `libervia.backend`
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 02 Jun 2023 11:49:51 +0200 |
parents | sat/tools/stream.py@524856bd7b19 |
children | 040095a5dc7f |
comparison
equal
deleted
inserted
replaced
4070:d10748475025 | 4071:4b842c1fb686 |
---|---|
1 #!/usr/bin/env python3 | |
2 | |
3 # Libervia: an XMPP client | |
4 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) | |
5 | |
6 # This program is free software: you can redistribute it and/or modify | |
7 # it under the terms of the GNU Affero General Public License as published by | |
8 # the Free Software Foundation, either version 3 of the License, or | |
9 # (at your option) any later version. | |
10 | |
11 # This program is distributed in the hope that it will be useful, | |
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
14 # GNU Affero General Public License for more details. | |
15 | |
16 # You should have received a copy of the GNU Affero General Public License | |
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
18 | |
19 """ interfaces """ | |
20 | |
21 from argparse import OPTIONAL | |
22 from pathlib import Path | |
23 from typing import Callable, Optional, Union | |
24 import uuid | |
25 import os | |
26 from zope import interface | |
27 from libervia.backend.core import exceptions | |
28 from libervia.backend.core.constants import Const as C | |
29 from libervia.backend.core.core_types import SatXMPPEntity | |
30 from libervia.backend.core.log import getLogger | |
31 from twisted.protocols import basic | |
32 from twisted.internet import interfaces | |
33 | |
34 from libervia.backend.core.sat_main import SAT | |
35 | |
36 log = getLogger(__name__) | |
37 | |
38 | |
39 class IStreamProducer(interface.Interface): | |
40 def start_stream(consumer): | |
41 """start producing the stream | |
42 | |
43 @return (D): deferred fired when stream is finished | |
44 """ | |
45 pass | |
46 | |
47 | |
48 class SatFile: | |
49 """A file-like object to have high level files manipulation""" | |
50 | |
51 # TODO: manage "with" statement | |
52 | |
53 def __init__( | |
54 self, | |
55 host: SAT, | |
56 client: SatXMPPEntity, | |
57 path: Union[str, Path], | |
58 mode: str = "rb", | |
59 uid: Optional[str] = None, | |
60 size: Optional[int] = None, | |
61 data_cb: Optional[Callable] = None, | |
62 auto_end_signals: bool = True, | |
63 check_size_with_read: bool = False, | |
64 pre_close_cb: Optional[Callable]=None | |
65 ) -> None: | |
66 """ | |
67 @param host: %(doc_host)s | |
68 @param path(Path, str): path to the file to get or write to | |
69 @param mode(str): same as for built-in "open" function | |
70 @param uid(unicode, None): unique id identifing this progressing element | |
71 This uid will be used with self.host.progress_get | |
72 will be automaticaly generated if None | |
73 @param size(None, int): size of the file (when known in advance) | |
74 @param data_cb(None, callable): method to call on each data read/write | |
75 can be used to do processing like calculating hash. | |
76 if data_cb return a non None value, it will be used instead of the | |
77 data read/to write | |
78 @param auto_end_signals(bool): if True, progress_finished and progress_error signals | |
79 are automatically sent. | |
80 if False, you'll have to call self.progress_finished and self.progress_error | |
81 yourself. | |
82 progress_started signal is always sent automatically | |
83 @param check_size_with_read(bool): if True, size will be checked using number of | |
84 bytes read or written. This is useful when data_cb modifiy len of file. | |
85 @param pre_close_cb: | |
86 """ | |
87 self.host = host | |
88 self.profile = client.profile | |
89 self.uid = uid or str(uuid.uuid4()) | |
90 self._file = open(path, mode) | |
91 self.size = size | |
92 self.data_cb = data_cb | |
93 self.auto_end_signals = auto_end_signals | |
94 self.pre_close_cb = pre_close_cb | |
95 metadata = self.get_progress_metadata() | |
96 self.host.register_progress_cb( | |
97 self.uid, self.get_progress, metadata, profile=client.profile | |
98 ) | |
99 self.host.bridge.progress_started(self.uid, metadata, client.profile) | |
100 | |
101 self._transfer_count = 0 if check_size_with_read else None | |
102 | |
103 @property | |
104 def check_size_with_read(self): | |
105 return self._transfer_count is not None | |
106 | |
107 @check_size_with_read.setter | |
108 def check_size_with_read(self, value): | |
109 if value and self._transfer_count is None: | |
110 self._transfer_count = 0 | |
111 else: | |
112 self._transfer_count = None | |
113 | |
114 def check_size(self): | |
115 """Check that current size correspond to given size | |
116 | |
117 must be used when the transfer is supposed to be finished | |
118 @return (bool): True if the position is the same as given size | |
119 @raise exceptions.NotFound: size has not be specified | |
120 """ | |
121 if self.check_size_with_read: | |
122 position = self._transfer_count | |
123 else: | |
124 position = self._file.tell() | |
125 if self.size is None: | |
126 raise exceptions.NotFound | |
127 return position == self.size | |
128 | |
129 def close(self, progress_metadata=None, error=None): | |
130 """Close the current file | |
131 | |
132 @param progress_metadata(None, dict): metadata to send with _onProgressFinished | |
133 message | |
134 @param error(None, unicode): set to an error message if progress was not | |
135 successful | |
136 mutually exclusive with progress_metadata | |
137 error can happen even if error is None, if current size differ from given size | |
138 """ | |
139 if self._file.closed: | |
140 return # avoid double close (which is allowed) error | |
141 if self.pre_close_cb is not None: | |
142 self.pre_close_cb() | |
143 if error is None: | |
144 try: | |
145 size_ok = self.check_size() | |
146 except exceptions.NotFound: | |
147 size_ok = True | |
148 if not size_ok: | |
149 error = "declared and actual size mismatch" | |
150 log.warning(error) | |
151 progress_metadata = None | |
152 | |
153 self._file.close() | |
154 | |
155 if self.auto_end_signals: | |
156 if error is None: | |
157 self.progress_finished(progress_metadata) | |
158 else: | |
159 assert progress_metadata is None | |
160 self.progress_error(error) | |
161 | |
162 self.host.remove_progress_cb(self.uid, self.profile) | |
163 if error is not None: | |
164 log.error(f"file {self._file} closed with an error: {error}") | |
165 | |
166 @property | |
167 def closed(self): | |
168 return self._file.closed | |
169 | |
170 def progress_finished(self, metadata=None): | |
171 if metadata is None: | |
172 metadata = {} | |
173 self.host.bridge.progress_finished(self.uid, metadata, self.profile) | |
174 | |
175 def progress_error(self, error): | |
176 self.host.bridge.progress_error(self.uid, error, self.profile) | |
177 | |
178 def flush(self): | |
179 self._file.flush() | |
180 | |
181 def write(self, buf): | |
182 if self.data_cb is not None: | |
183 ret = self.data_cb(buf) | |
184 if ret is not None: | |
185 buf = ret | |
186 if self._transfer_count is not None: | |
187 self._transfer_count += len(buf) | |
188 self._file.write(buf) | |
189 | |
190 def read(self, size=-1): | |
191 read = self._file.read(size) | |
192 if self.data_cb is not None: | |
193 ret = self.data_cb(read) | |
194 if ret is not None: | |
195 read = ret | |
196 if self._transfer_count is not None: | |
197 self._transfer_count += len(read) | |
198 return read | |
199 | |
200 def seek(self, offset, whence=os.SEEK_SET): | |
201 self._file.seek(offset, whence) | |
202 | |
203 def tell(self): | |
204 return self._file.tell() | |
205 | |
206 @property | |
207 def mode(self): | |
208 return self._file.mode | |
209 | |
210 def get_progress_metadata(self): | |
211 """Return progression metadata as given to progress_started | |
212 | |
213 @return (dict): metadata (check bridge for documentation) | |
214 """ | |
215 metadata = {"type": C.META_TYPE_FILE} | |
216 | |
217 mode = self._file.mode | |
218 if "+" in mode: | |
219 pass # we have no direction in read/write modes | |
220 elif mode in ("r", "rb"): | |
221 metadata["direction"] = "out" | |
222 elif mode in ("w", "wb"): | |
223 metadata["direction"] = "in" | |
224 elif "U" in mode: | |
225 metadata["direction"] = "out" | |
226 else: | |
227 raise exceptions.InternalError | |
228 | |
229 metadata["name"] = self._file.name | |
230 | |
231 return metadata | |
232 | |
233 def get_progress(self, progress_id, profile): | |
234 ret = {"position": self._file.tell()} | |
235 if self.size: | |
236 ret["size"] = self.size | |
237 return ret | |
238 | |
239 | |
240 @interface.implementer(IStreamProducer) | |
241 @interface.implementer(interfaces.IConsumer) | |
242 class FileStreamObject(basic.FileSender): | |
243 def __init__(self, host, client, path, **kwargs): | |
244 """ | |
245 | |
246 A SatFile will be created and put in self.file_obj | |
247 @param path(unicode): path to the file | |
248 @param **kwargs: kw arguments to pass to SatFile | |
249 """ | |
250 self.file_obj = SatFile(host, client, path, **kwargs) | |
251 | |
252 def registerProducer(self, producer, streaming): | |
253 pass | |
254 | |
255 def start_stream(self, consumer): | |
256 return self.beginFileTransfer(self.file_obj, consumer) | |
257 | |
258 def write(self, data): | |
259 self.file_obj.write(data) | |
260 | |
261 def close(self, *args, **kwargs): | |
262 self.file_obj.close(*args, **kwargs) |