comparison sat/tools/stream.py @ 3087:a51f7fce1e2c

tools (stream): data modification on SatFile: - if `data_cb` is used and if it returns a not None value, it is used instead of the data read from the file. This allows data modification on the fly, useful notably for encryption - new `check_size_with_read` argument which check size on `close()` using amount of data actually read/written instead of file size. This avoid a warning when file is modified on the fly - added `closed` attribute
author Goffi <goffi@goffi.org>
date Fri, 20 Dec 2019 12:28:04 +0100
parents ab2696e34d29
children 9d0df638c8b4
comparison
equal deleted inserted replaced
3086:13be04a70e2f 3087:a51f7fce1e2c
17 # You should have received a copy of the GNU Affero General Public License 17 # You should have received a copy of the GNU Affero General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>. 18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 19
20 """ interfaces """ 20 """ interfaces """
21 21
22 import uuid
23 import os
24 from zope import interface
22 from sat.core import exceptions 25 from sat.core import exceptions
23 from sat.core.constants import Const as C 26 from sat.core.constants import Const as C
24 from sat.core.log import getLogger 27 from sat.core.log import getLogger
25 from twisted.protocols import basic 28 from twisted.protocols import basic
26 from twisted.internet import interfaces 29 from twisted.internet import interfaces
27 from zope import interface
28 import uuid
29 import os
30 30
31 log = getLogger(__name__) 31 log = getLogger(__name__)
32 32
33 33
34 class IStreamProducer(interface.Interface): 34 class IStreamProducer(interface.Interface):
38 @return (D): deferred fired when stream is finished 38 @return (D): deferred fired when stream is finished
39 """ 39 """
40 pass 40 pass
41 41
42 42
43 class SatFile(object): 43 class SatFile:
44 """A file-like object to have high level files manipulation""" 44 """A file-like object to have high level files manipulation"""
45 45
46 # TODO: manage "with" statement 46 # TODO: manage "with" statement
47 47
48 def __init__(self, host, client, path, mode="rb", uid=None, size=None, data_cb=None, 48 def __init__(self, host, client, path, mode="rb", uid=None, size=None, data_cb=None,
49 auto_end_signals=True): 49 auto_end_signals=True, check_size_with_read=False):
50 """ 50 """
51 @param host: %(doc_host)s 51 @param host: %(doc_host)s
52 @param path(str): path of the file to get 52 @param path(Path, str): path to the file to get or write to
53 @param mode(str): same as for built-in "open" function 53 @param mode(str): same as for built-in "open" function
54 @param uid(unicode, None): unique id identifing this progressing element 54 @param uid(unicode, None): unique id identifing this progressing element
55 This uid will be used with self.host.progressGet 55 This uid will be used with self.host.progressGet
56 will be automaticaly generated if None 56 will be automaticaly generated if None
57 @param size(None, int): size of the file (when known in advance) 57 @param size(None, int): size of the file (when known in advance)
58 @param data_cb(None, callable): method to call on each data read/write 58 @param data_cb(None, callable): method to call on each data read/write
59 mainly useful to do things like calculating hash 59 can be used to do processing like calculating hash.
60 if data_cb return a non None value, it will be used instead of the
61 data read/to write
60 @param auto_end_signals(bool): if True, progressFinished and progressError signals 62 @param auto_end_signals(bool): if True, progressFinished and progressError signals
61 are automatically sent. 63 are automatically sent.
62 if False, you'll have to call self.progressFinished and self.progressError 64 if False, you'll have to call self.progressFinished and self.progressError
63 yourself. 65 yourself.
64 progressStarted signal is always sent automatically 66 progressStarted signal is always sent automatically
67 @param check_size_with_read(bool): if True, size well be checked using number of
68 bytes read or written. This is useful when data_cb modifiy len of file.
65 """ 69 """
66 self.host = host 70 self.host = host
67 self.profile = client.profile 71 self.profile = client.profile
68 self.uid = uid or str(uuid.uuid4()) 72 self.uid = uid or str(uuid.uuid4())
69 self._file = open(path, mode) 73 self._file = open(path, mode)
74 self.host.registerProgressCb( 78 self.host.registerProgressCb(
75 self.uid, self.getProgress, metadata, profile=client.profile 79 self.uid, self.getProgress, metadata, profile=client.profile
76 ) 80 )
77 self.host.bridge.progressStarted(self.uid, metadata, client.profile) 81 self.host.bridge.progressStarted(self.uid, metadata, client.profile)
78 82
83 self._transfer_count = 0 if check_size_with_read else None
84
85 @property
86 def check_size_with_read(self):
87 return self._transfer_count is not None
88
89 @check_size_with_read.setter
90 def check_size_with_read(self, value):
91 if value and self._transfer_count is None:
92 self._transfer_count = 0
93 else:
94 self._transfer_count = None
95
79 def checkSize(self): 96 def checkSize(self):
80 """Check that current size correspond to given size 97 """Check that current size correspond to given size
81 98
82 must be used when the transfer is supposed to be finished 99 must be used when the transfer is supposed to be finished
83 @return (bool): True if the position is the same as given size 100 @return (bool): True if the position is the same as given size
84 @raise exceptions.NotFound: size has not be specified 101 @raise exceptions.NotFound: size has not be specified
85 """ 102 """
86 position = self._file.tell() 103 if self.check_size_with_read:
104 position = self._transfer_count
105 else:
106 position = self._file.tell()
87 if self.size is None: 107 if self.size is None:
88 raise exceptions.NotFound 108 raise exceptions.NotFound
89 return position == self.size 109 return position == self.size
90 110
91 def close(self, progress_metadata=None, error=None): 111 def close(self, progress_metadata=None, error=None):
92 """Close the current file 112 """Close the current file
93 113
94 @param progress_metadata(None, dict): metadata to send with _onProgressFinished message 114 @param progress_metadata(None, dict): metadata to send with _onProgressFinished
95 @param error(None, unicode): set to an error message if progress was not successful 115 message
116 @param error(None, unicode): set to an error message if progress was not
117 successful
96 mutually exclusive with progress_metadata 118 mutually exclusive with progress_metadata
97 error can happen even if error is None, if current size differ from given size 119 error can happen even if error is None, if current size differ from given size
98 """ 120 """
99 if self._file.closed: 121 if self._file.closed:
100 return # avoid double close (which is allowed) error 122 return # avoid double close (which is allowed) error
117 assert progress_metadata is None 139 assert progress_metadata is None
118 self.progressError(error) 140 self.progressError(error)
119 141
120 self.host.removeProgressCb(self.uid, self.profile) 142 self.host.removeProgressCb(self.uid, self.profile)
121 143
144 @property
145 def closed(self):
146 return self._file.closed
147
122 def progressFinished(self, metadata=None): 148 def progressFinished(self, metadata=None):
123 if metadata is None: 149 if metadata is None:
124 metadata = {} 150 metadata = {}
125 self.host.bridge.progressFinished(self.uid, metadata, self.profile) 151 self.host.bridge.progressFinished(self.uid, metadata, self.profile)
126 152
129 155
130 def flush(self): 156 def flush(self):
131 self._file.flush() 157 self._file.flush()
132 158
133 def write(self, buf): 159 def write(self, buf):
160 if self.data_cb is not None:
161 ret = self.data_cb(buf)
162 if ret is not None:
163 buf = ret
164 if self._transfer_count is not None:
165 self._transfer_count += len(buf)
134 self._file.write(buf) 166 self._file.write(buf)
135 if self.data_cb is not None:
136 return self.data_cb(buf)
137 167
138 def read(self, size=-1): 168 def read(self, size=-1):
139 read = self._file.read(size) 169 read = self._file.read(size)
140 if self.data_cb is not None and read: 170 if self.data_cb is not None:
141 self.data_cb(read) 171 ret = self.data_cb(read)
172 if ret is not None:
173 read = ret
174 if self._transfer_count is not None:
175 self._transfer_count += len(read)
142 return read 176 return read
143 177
144 def seek(self, offset, whence=os.SEEK_SET): 178 def seek(self, offset, whence=os.SEEK_SET):
145 self._file.seek(offset, whence) 179 self._file.seek(offset, whence)
146 180