Mercurial > libervia-backend
comparison sat/tools/stream.py @ 3087:a51f7fce1e2c
tools (stream): data modification on SatFile:
- if `data_cb` is used and if it returns a not None value, it is used instead of the data
read from the file. This allows data modification on the fly, useful notably for
encryption
- new `check_size_with_read` argument which check size on `close()` using amount of data
actually read/written instead of file size. This avoid a warning when file is modified
on the fly
- added `closed` attribute
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 20 Dec 2019 12:28:04 +0100 |
parents | ab2696e34d29 |
children | 9d0df638c8b4 |
comparison
equal
deleted
inserted
replaced
3086:13be04a70e2f | 3087:a51f7fce1e2c |
---|---|
17 # You should have received a copy of the GNU Affero General Public License | 17 # You should have received a copy of the GNU Affero General Public License |
18 # along with this program. If not, see <http://www.gnu.org/licenses/>. | 18 # along with this program. If not, see <http://www.gnu.org/licenses/>. |
19 | 19 |
20 """ interfaces """ | 20 """ interfaces """ |
21 | 21 |
22 import uuid | |
23 import os | |
24 from zope import interface | |
22 from sat.core import exceptions | 25 from sat.core import exceptions |
23 from sat.core.constants import Const as C | 26 from sat.core.constants import Const as C |
24 from sat.core.log import getLogger | 27 from sat.core.log import getLogger |
25 from twisted.protocols import basic | 28 from twisted.protocols import basic |
26 from twisted.internet import interfaces | 29 from twisted.internet import interfaces |
27 from zope import interface | |
28 import uuid | |
29 import os | |
30 | 30 |
31 log = getLogger(__name__) | 31 log = getLogger(__name__) |
32 | 32 |
33 | 33 |
34 class IStreamProducer(interface.Interface): | 34 class IStreamProducer(interface.Interface): |
38 @return (D): deferred fired when stream is finished | 38 @return (D): deferred fired when stream is finished |
39 """ | 39 """ |
40 pass | 40 pass |
41 | 41 |
42 | 42 |
43 class SatFile(object): | 43 class SatFile: |
44 """A file-like object to have high level files manipulation""" | 44 """A file-like object to have high level files manipulation""" |
45 | 45 |
46 # TODO: manage "with" statement | 46 # TODO: manage "with" statement |
47 | 47 |
48 def __init__(self, host, client, path, mode="rb", uid=None, size=None, data_cb=None, | 48 def __init__(self, host, client, path, mode="rb", uid=None, size=None, data_cb=None, |
49 auto_end_signals=True): | 49 auto_end_signals=True, check_size_with_read=False): |
50 """ | 50 """ |
51 @param host: %(doc_host)s | 51 @param host: %(doc_host)s |
52 @param path(str): path of the file to get | 52 @param path(Path, str): path to the file to get or write to |
53 @param mode(str): same as for built-in "open" function | 53 @param mode(str): same as for built-in "open" function |
54 @param uid(unicode, None): unique id identifing this progressing element | 54 @param uid(unicode, None): unique id identifing this progressing element |
55 This uid will be used with self.host.progressGet | 55 This uid will be used with self.host.progressGet |
56 will be automaticaly generated if None | 56 will be automaticaly generated if None |
57 @param size(None, int): size of the file (when known in advance) | 57 @param size(None, int): size of the file (when known in advance) |
58 @param data_cb(None, callable): method to call on each data read/write | 58 @param data_cb(None, callable): method to call on each data read/write |
59 mainly useful to do things like calculating hash | 59 can be used to do processing like calculating hash. |
60 if data_cb return a non None value, it will be used instead of the | |
61 data read/to write | |
60 @param auto_end_signals(bool): if True, progressFinished and progressError signals | 62 @param auto_end_signals(bool): if True, progressFinished and progressError signals |
61 are automatically sent. | 63 are automatically sent. |
62 if False, you'll have to call self.progressFinished and self.progressError | 64 if False, you'll have to call self.progressFinished and self.progressError |
63 yourself. | 65 yourself. |
64 progressStarted signal is always sent automatically | 66 progressStarted signal is always sent automatically |
67 @param check_size_with_read(bool): if True, size well be checked using number of | |
68 bytes read or written. This is useful when data_cb modifiy len of file. | |
65 """ | 69 """ |
66 self.host = host | 70 self.host = host |
67 self.profile = client.profile | 71 self.profile = client.profile |
68 self.uid = uid or str(uuid.uuid4()) | 72 self.uid = uid or str(uuid.uuid4()) |
69 self._file = open(path, mode) | 73 self._file = open(path, mode) |
74 self.host.registerProgressCb( | 78 self.host.registerProgressCb( |
75 self.uid, self.getProgress, metadata, profile=client.profile | 79 self.uid, self.getProgress, metadata, profile=client.profile |
76 ) | 80 ) |
77 self.host.bridge.progressStarted(self.uid, metadata, client.profile) | 81 self.host.bridge.progressStarted(self.uid, metadata, client.profile) |
78 | 82 |
83 self._transfer_count = 0 if check_size_with_read else None | |
84 | |
85 @property | |
86 def check_size_with_read(self): | |
87 return self._transfer_count is not None | |
88 | |
89 @check_size_with_read.setter | |
90 def check_size_with_read(self, value): | |
91 if value and self._transfer_count is None: | |
92 self._transfer_count = 0 | |
93 else: | |
94 self._transfer_count = None | |
95 | |
79 def checkSize(self): | 96 def checkSize(self): |
80 """Check that current size correspond to given size | 97 """Check that current size correspond to given size |
81 | 98 |
82 must be used when the transfer is supposed to be finished | 99 must be used when the transfer is supposed to be finished |
83 @return (bool): True if the position is the same as given size | 100 @return (bool): True if the position is the same as given size |
84 @raise exceptions.NotFound: size has not be specified | 101 @raise exceptions.NotFound: size has not be specified |
85 """ | 102 """ |
86 position = self._file.tell() | 103 if self.check_size_with_read: |
104 position = self._transfer_count | |
105 else: | |
106 position = self._file.tell() | |
87 if self.size is None: | 107 if self.size is None: |
88 raise exceptions.NotFound | 108 raise exceptions.NotFound |
89 return position == self.size | 109 return position == self.size |
90 | 110 |
91 def close(self, progress_metadata=None, error=None): | 111 def close(self, progress_metadata=None, error=None): |
92 """Close the current file | 112 """Close the current file |
93 | 113 |
94 @param progress_metadata(None, dict): metadata to send with _onProgressFinished message | 114 @param progress_metadata(None, dict): metadata to send with _onProgressFinished |
95 @param error(None, unicode): set to an error message if progress was not successful | 115 message |
116 @param error(None, unicode): set to an error message if progress was not | |
117 successful | |
96 mutually exclusive with progress_metadata | 118 mutually exclusive with progress_metadata |
97 error can happen even if error is None, if current size differ from given size | 119 error can happen even if error is None, if current size differ from given size |
98 """ | 120 """ |
99 if self._file.closed: | 121 if self._file.closed: |
100 return # avoid double close (which is allowed) error | 122 return # avoid double close (which is allowed) error |
117 assert progress_metadata is None | 139 assert progress_metadata is None |
118 self.progressError(error) | 140 self.progressError(error) |
119 | 141 |
120 self.host.removeProgressCb(self.uid, self.profile) | 142 self.host.removeProgressCb(self.uid, self.profile) |
121 | 143 |
144 @property | |
145 def closed(self): | |
146 return self._file.closed | |
147 | |
122 def progressFinished(self, metadata=None): | 148 def progressFinished(self, metadata=None): |
123 if metadata is None: | 149 if metadata is None: |
124 metadata = {} | 150 metadata = {} |
125 self.host.bridge.progressFinished(self.uid, metadata, self.profile) | 151 self.host.bridge.progressFinished(self.uid, metadata, self.profile) |
126 | 152 |
129 | 155 |
130 def flush(self): | 156 def flush(self): |
131 self._file.flush() | 157 self._file.flush() |
132 | 158 |
133 def write(self, buf): | 159 def write(self, buf): |
160 if self.data_cb is not None: | |
161 ret = self.data_cb(buf) | |
162 if ret is not None: | |
163 buf = ret | |
164 if self._transfer_count is not None: | |
165 self._transfer_count += len(buf) | |
134 self._file.write(buf) | 166 self._file.write(buf) |
135 if self.data_cb is not None: | |
136 return self.data_cb(buf) | |
137 | 167 |
138 def read(self, size=-1): | 168 def read(self, size=-1): |
139 read = self._file.read(size) | 169 read = self._file.read(size) |
140 if self.data_cb is not None and read: | 170 if self.data_cb is not None: |
141 self.data_cb(read) | 171 ret = self.data_cb(read) |
172 if ret is not None: | |
173 read = ret | |
174 if self._transfer_count is not None: | |
175 self._transfer_count += len(read) | |
142 return read | 176 return read |
143 | 177 |
144 def seek(self, offset, whence=os.SEEK_SET): | 178 def seek(self, offset, whence=os.SEEK_SET): |
145 self._file.seek(offset, whence) | 179 self._file.seek(offset, whence) |
146 | 180 |