Mercurial > libervia-backend
comparison src/plugins/plugin_misc_file.py @ 2489:e2a7bb875957
plugin pipe/stream, file transfert: refactoring and improvments:
this is a big patch as things had to be changed at the same time.
- changed methods using profile argument to use client instead
- move SatFile in a new tools.stream module, has it should be part of core, not a plugin
- new IStreamProducer interface, to handler starting a pull producer
- new FileStreamObject which create a stream producer/consumer from a SatFile
- plugin pipe is no more using unix named pipe, as it complicate the thing,
special care need to be taken to not block, and it's generally not necessary.
Instead a socket is now used, so the plugin has been renomed to jingle stream.
- bad connection/error should be better handler in jingle stream plugin, and code should not block anymore
- jp pipe commands have been updated accordingly
fix bug 237
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 08 Feb 2018 00:37:42 +0100 |
parents | 0046283a285d |
children | 7ad5f2c4e34a |
comparison
equal
deleted
inserted
replaced
2488:78c7992a26ed | 2489:e2a7bb875957 |
---|---|
21 from sat.core.constants import Const as C | 21 from sat.core.constants import Const as C |
22 from sat.core.log import getLogger | 22 from sat.core.log import getLogger |
23 log = getLogger(__name__) | 23 log = getLogger(__name__) |
24 from sat.core import exceptions | 24 from sat.core import exceptions |
25 from sat.tools import xml_tools | 25 from sat.tools import xml_tools |
26 from sat.tools import stream | |
26 from twisted.internet import defer | 27 from twisted.internet import defer |
27 from twisted.words.protocols.jabber import jid | 28 from twisted.words.protocols.jabber import jid |
28 import os | 29 import os |
29 import os.path | 30 import os.path |
30 import uuid | |
31 | 31 |
32 | 32 |
33 PLUGIN_INFO = { | 33 PLUGIN_INFO = { |
34 C.PI_NAME: "File Tansfer", | 34 C.PI_NAME: "File Tansfer", |
35 C.PI_IMPORT_NAME: "FILE", | 35 C.PI_IMPORT_NAME: "FILE", |
50 SECURITY_LIMIT = 30 | 50 SECURITY_LIMIT = 30 |
51 | 51 |
52 PROGRESS_ID_KEY = 'progress_id' | 52 PROGRESS_ID_KEY = 'progress_id' |
53 | 53 |
54 | 54 |
55 class SatFile(object): | |
56 """A file-like object to have high level files manipulation""" | |
57 # TODO: manage "with" statement | |
58 | |
59 def __init__(self, host, path, mode='rb', uid=None, size=None, data_cb=None, auto_end_signals=True, profile=C.PROF_KEY_NONE): | |
60 """ | |
61 @param host: %(doc_host)s | |
62 @param path(str): path of the file to get | |
63 @param mode(str): same as for built-in "open" function | |
64 @param uid(unicode, None): unique id identifing this progressing element | |
65 This uid will be used with self.host.progressGet | |
66 will be automaticaly generated if None | |
67 @param size(None, int): size of the file | |
68 @param data_cb(None, callable): method to call on each data read/write | |
69 mainly useful to do things like calculating hash | |
70 @param auto_end_signals(bool): if True, progressFinished and progressError signals are automatically sent | |
71 if False, you'll have to call self.progressFinished and self.progressError yourself | |
72 progressStarted signal is always sent automatically | |
73 """ | |
74 self.host = host | |
75 self.uid = uid or unicode(uuid.uuid4()) | |
76 self._file = open(path, mode) | |
77 self.size = size | |
78 self.data_cb = data_cb | |
79 self.profile = profile | |
80 self.auto_end_signals = auto_end_signals | |
81 metadata = self.getProgressMetadata() | |
82 self.host.registerProgressCb(self.uid, self.getProgress, metadata, profile=profile) | |
83 self.host.bridge.progressStarted(self.uid, metadata, self.profile) | |
84 | |
85 def checkSize(self): | |
86 """Check that current size correspond to given size | |
87 | |
88 must be used when the transfer is supposed to be finished | |
89 @return (bool): True if the position is the same as given size | |
90 @raise exceptions.NotFound: size has not be specified | |
91 """ | |
92 position = self._file.tell() | |
93 if self.size is None: | |
94 raise exceptions.NotFound | |
95 return position == self.size | |
96 | |
97 | |
98 def close(self, progress_metadata=None, error=None): | |
99 """Close the current file | |
100 | |
101 @param progress_metadata(None, dict): metadata to send with _onProgressFinished message | |
102 @param error(None, unicode): set to an error message if progress was not successful | |
103 mutually exclusive with progress_metadata | |
104 error can happen even if error is None, if current size differ from given size | |
105 """ | |
106 if self._file.closed: | |
107 return # avoid double close (which is allowed) error | |
108 if error is None: | |
109 try: | |
110 size_ok = self.checkSize() | |
111 except exceptions.NotFound: | |
112 size_ok = True | |
113 if not size_ok: | |
114 error = u'declared and actual size mismatch' | |
115 log.warning(error) | |
116 progress_metadata = None | |
117 | |
118 self._file.close() | |
119 | |
120 if self.auto_end_signals: | |
121 if error is None: | |
122 self.progressFinished(progress_metadata) | |
123 else: | |
124 assert progress_metadata is None | |
125 self.progressError(error) | |
126 | |
127 self.host.removeProgressCb(self.uid, self.profile) | |
128 | |
129 def progressFinished(self, metadata=None): | |
130 if metadata is None: | |
131 metadata = {} | |
132 self.host.bridge.progressFinished(self.uid, metadata, self.profile) | |
133 | |
134 def progressError(self, error): | |
135 self.host.bridge.progressError(self.uid, error, self.profile) | |
136 | |
137 def flush(self): | |
138 self._file.flush() | |
139 | |
140 def write(self, buf): | |
141 self._file.write(buf) | |
142 if self.data_cb is not None: | |
143 return self.data_cb(buf) | |
144 | |
145 def read(self, size=-1): | |
146 read = self._file.read(size) | |
147 if self.data_cb is not None and read: | |
148 self.data_cb(read) | |
149 return read | |
150 | |
151 def seek(self, offset, whence=os.SEEK_SET): | |
152 self._file.seek(offset, whence) | |
153 | |
154 def tell(self): | |
155 return self._file.tell() | |
156 | |
157 def mode(self): | |
158 return self._file.mode() | |
159 | |
160 def getProgressMetadata(self): | |
161 """Return progression metadata as given to progressStarted | |
162 | |
163 @return (dict): metadata (check bridge for documentation) | |
164 """ | |
165 metadata = {'type': C.META_TYPE_FILE} | |
166 | |
167 mode = self._file.mode | |
168 if '+' in mode: | |
169 pass # we have no direction in read/write modes | |
170 elif mode in ('r', 'rb'): | |
171 metadata['direction'] = 'out' | |
172 elif mode in ('w', 'wb'): | |
173 metadata['direction'] = 'in' | |
174 elif 'U' in mode: | |
175 metadata['direction'] = 'out' | |
176 else: | |
177 raise exceptions.InternalError | |
178 | |
179 metadata['name'] = self._file.name | |
180 | |
181 return metadata | |
182 | |
183 def getProgress(self, progress_id, profile): | |
184 ret = {'position': self._file.tell()} | |
185 if self.size: | |
186 ret['size'] = self.size | |
187 return ret | |
188 | |
189 | |
190 class FilePlugin(object): | 55 class FilePlugin(object): |
191 File=SatFile | 56 File=stream.SatFile |
192 | 57 |
193 def __init__(self, host): | 58 def __init__(self, host): |
194 log.info(_("plugin File initialization")) | 59 log.info(_("plugin File initialization")) |
195 self.host = host | 60 self.host = host |
196 host.bridge.addMethod("fileSend", ".plugin", in_sign='sssss', out_sign='a{ss}', method=self._fileSend, async=True) | 61 host.bridge.addMethod("fileSend", ".plugin", in_sign='sssss', out_sign='a{ss}', method=self._fileSend, async=True) |
197 self._file_callbacks = [] | 62 self._file_callbacks = [] |
198 host.importMenu((D_("Action"), D_("send file")), self._fileSendMenu, security_limit=10, help_string=D_("Send a file"), type_=C.MENU_SINGLE) | 63 host.importMenu((D_("Action"), D_("send file")), self._fileSendMenu, security_limit=10, help_string=D_("Send a file"), type_=C.MENU_SINGLE) |
199 | 64 |
200 def _fileSend(self, peer_jid_s, filepath, name="", file_desc="", profile=C.PROF_KEY_NONE): | 65 def _fileSend(self, peer_jid_s, filepath, name="", file_desc="", profile=C.PROF_KEY_NONE): |
201 return self.fileSend(jid.JID(peer_jid_s), filepath, name or None, file_desc or None, profile) | 66 client = self.host.getClient(profile) |
67 return self.fileSend(client, jid.JID(peer_jid_s), filepath, name or None, file_desc or None) | |
202 | 68 |
203 @defer.inlineCallbacks | 69 @defer.inlineCallbacks |
204 def fileSend(self, peer_jid, filepath, filename=None, file_desc=None, profile=C.PROF_KEY_NONE): | 70 def fileSend(self, client, peer_jid, filepath, filename=None, file_desc=None): |
205 """Send a file using best available method | 71 """Send a file using best available method |
206 | 72 |
207 @param peer_jid(jid.JID): jid of the destinee | 73 @param peer_jid(jid.JID): jid of the destinee |
208 @param filepath(str): absolute path to the file | 74 @param filepath(str): absolute path to the file |
209 @param filename(unicode, None): name to use, or None to find it from filepath | 75 @param filename(unicode, None): name to use, or None to find it from filepath |
210 @param file_desc(unicode, None): description of the file | 76 @param file_desc(unicode, None): description of the file |
211 @param profile: %(doc_profile)s | 77 @param profile: %(doc_profile)s |
212 @return (dict): action dictionary, with progress id in case of success, else xmlui message | 78 @return (dict): action dictionary, with progress id in case of success, else xmlui message |
213 """ | 79 """ |
214 client = self.host.getClient(profile) | |
215 if not os.path.isfile(filepath): | 80 if not os.path.isfile(filepath): |
216 raise exceptions.DataError(u"The given path doesn't link to a file") | 81 raise exceptions.DataError(u"The given path doesn't link to a file") |
217 if not filename: | 82 if not filename: |
218 filename = os.path.basename(filepath) or '_' | 83 filename = os.path.basename(filepath) or '_' |
219 for namespace, callback, priority, method_name in self._file_callbacks: | 84 for namespace, callback, priority, method_name in self._file_callbacks: |
220 has_feature = yield self.host.hasFeature(client, namespace, peer_jid) | 85 has_feature = yield self.host.hasFeature(client, namespace, peer_jid) |
221 if has_feature: | 86 if has_feature: |
222 log.info(u"{name} method will be used to send the file".format(name=method_name)) | 87 log.info(u"{name} method will be used to send the file".format(name=method_name)) |
223 progress_id = yield callback(peer_jid, filepath, filename, file_desc, profile) | 88 progress_id = yield callback(client, peer_jid, filepath, filename, file_desc) |
224 defer.returnValue({'progress': progress_id}) | 89 defer.returnValue({'progress': progress_id}) |
225 msg = u"Can't find any method to send file to {jid}".format(jid=peer_jid.full()) | 90 msg = u"Can't find any method to send file to {jid}".format(jid=peer_jid.full()) |
226 log.warning(msg) | 91 log.warning(msg) |
227 defer.returnValue({'xmlui': xml_tools.note(u"Can't transfer file", msg, C.XMLUI_DATA_LVL_WARNING).toXml()}) | 92 defer.returnValue({'xmlui': xml_tools.note(u"Can't transfer file", msg, C.XMLUI_DATA_LVL_WARNING).toXml()}) |
228 | 93 |
229 def _onFileChoosed(self, peer_jid, data, profile): | 94 def _onFileChoosed(self, client, peer_jid, data): |
230 cancelled = C.bool(data.get("cancelled", C.BOOL_FALSE)) | 95 cancelled = C.bool(data.get("cancelled", C.BOOL_FALSE)) |
231 if cancelled: | 96 if cancelled: |
232 return | 97 return |
233 path=data['path'] | 98 path=data['path'] |
234 return self.fileSend(peer_jid, path, profile=profile) | 99 return self.fileSend(client, peer_jid, path) |
235 | 100 |
236 def _fileSendMenu(self, data, profile): | 101 def _fileSendMenu(self, data, profile): |
237 """ XMLUI activated by menu: return file sending UI | 102 """ XMLUI activated by menu: return file sending UI |
238 | 103 |
239 @param profile: %(doc_profile)s | 104 @param profile: %(doc_profile)s |
241 try: | 106 try: |
242 jid_ = jid.JID(data['jid']) | 107 jid_ = jid.JID(data['jid']) |
243 except RuntimeError: | 108 except RuntimeError: |
244 raise exceptions.DataError(_("Invalid JID")) | 109 raise exceptions.DataError(_("Invalid JID")) |
245 | 110 |
246 file_choosed_id = self.host.registerCallback(lambda data, profile: self._onFileChoosed(jid_, data, profile), with_data=True, one_shot=True) | 111 file_choosed_id = self.host.registerCallback(lambda data, profile: self._onFileChoosed(self.host.getClient(profile), jid_, data), with_data=True, one_shot=True) |
247 xml_ui = xml_tools.XMLUI( | 112 xml_ui = xml_tools.XMLUI( |
248 C.XMLUI_DIALOG, | 113 C.XMLUI_DIALOG, |
249 dialog_opt = { | 114 dialog_opt = { |
250 C.XMLUI_DATA_TYPE: C.XMLUI_DIALOG_FILE, | 115 C.XMLUI_DATA_TYPE: C.XMLUI_DIALOG_FILE, |
251 C.XMLUI_DATA_MESS: _(SENDING).format(peer=jid_.full())}, | 116 C.XMLUI_DATA_MESS: _(SENDING).format(peer=jid_.full())}, |
276 raise exceptions.NotFound(u"The namespace to unregister doesn't exist") | 141 raise exceptions.NotFound(u"The namespace to unregister doesn't exist") |
277 | 142 |
278 # Dialogs with user | 143 # Dialogs with user |
279 # the overwrite check is done here | 144 # the overwrite check is done here |
280 | 145 |
281 def _openFileWrite(self, file_path, transfer_data, file_data, profile): | 146 def _openFileWrite(self, client, file_path, transfer_data, file_data, stream_object): |
282 assert 'file_obj' not in transfer_data | 147 if stream_object: |
283 transfer_data['file_obj'] = SatFile( | 148 assert 'stream_object' not in transfer_data |
284 self.host, | 149 transfer_data['stream_object'] = stream.FileStreamObject( |
285 file_path, | 150 self.host, |
286 'wb', | 151 client, |
287 uid=file_data[PROGRESS_ID_KEY], | 152 file_path, |
288 size=file_data['size'], | 153 mode='wb', |
289 data_cb = file_data.get('data_cb'), | 154 uid=file_data[PROGRESS_ID_KEY], |
290 profile=profile, | 155 size=file_data['size'], |
291 ) | 156 data_cb = file_data.get('data_cb'), |
292 | 157 ) |
293 def _gotConfirmation(self, data, peer_jid, transfer_data, file_data, profile): | 158 else: |
159 assert 'file_obj' not in transfer_data | |
160 transfer_data['file_obj'] = stream.SatFile( | |
161 self.host, | |
162 client, | |
163 file_path, | |
164 mode='wb', | |
165 uid=file_data[PROGRESS_ID_KEY], | |
166 size=file_data['size'], | |
167 data_cb = file_data.get('data_cb'), | |
168 ) | |
169 | |
170 def _gotConfirmation(self, data, client, peer_jid, transfer_data, file_data, stream_object): | |
294 """Called when the permission and dest path have been received | 171 """Called when the permission and dest path have been received |
295 | 172 |
296 @param peer_jid(jid.JID): jid of the file sender | 173 @param peer_jid(jid.JID): jid of the file sender |
297 @param transfer_data(dict): same as for [self.getDestDir] | 174 @param transfer_data(dict): same as for [self.getDestDir] |
298 @param file_data(dict): same as for [self.getDestDir] | 175 @param file_data(dict): same as for [self.getDestDir] |
299 @param profile: %(doc_profile)s | 176 @param stream_object(bool): same as for [self.getDestDir] |
300 return (bool): True if copy is wanted and OK | 177 return (bool): True if copy is wanted and OK |
301 False if user wants to cancel | 178 False if user wants to cancel |
302 if file exists ask confirmation and call again self._getDestDir if needed | 179 if file exists ask confirmation and call again self._getDestDir if needed |
303 """ | 180 """ |
304 if data.get('cancelled', False): | 181 if data.get('cancelled', False): |
309 | 186 |
310 # we manage case where file already exists | 187 # we manage case where file already exists |
311 if os.path.exists(file_path): | 188 if os.path.exists(file_path): |
312 def check_overwrite(overwrite): | 189 def check_overwrite(overwrite): |
313 if overwrite: | 190 if overwrite: |
314 self._openFileWrite(file_path, transfer_data, file_data, profile) | 191 self._openFileWrite(client, file_path, transfer_data, file_data, stream_object) |
315 return True | 192 return True |
316 else: | 193 else: |
317 return self.getDestDir(peer_jid, transfer_data, file_data, profile) | 194 return self.getDestDir(client, peer_jid, transfer_data, file_data) |
318 | 195 |
319 exists_d = xml_tools.deferConfirm( | 196 exists_d = xml_tools.deferConfirm( |
320 self.host, | 197 self.host, |
321 _(CONFIRM_OVERWRITE).format(file_path), | 198 _(CONFIRM_OVERWRITE).format(file_path), |
322 _(CONFIRM_OVERWRITE_TITLE), | 199 _(CONFIRM_OVERWRITE_TITLE), |
323 action_extra={'meta_from_jid': peer_jid.full(), | 200 action_extra={'meta_from_jid': peer_jid.full(), |
324 'meta_type': C.META_TYPE_OVERWRITE, | 201 'meta_type': C.META_TYPE_OVERWRITE, |
325 'meta_progress_id': file_data[PROGRESS_ID_KEY] | 202 'meta_progress_id': file_data[PROGRESS_ID_KEY] |
326 }, | 203 }, |
327 security_limit=SECURITY_LIMIT, | 204 security_limit=SECURITY_LIMIT, |
328 profile=profile) | 205 profile=client.profile) |
329 exists_d.addCallback(check_overwrite) | 206 exists_d.addCallback(check_overwrite) |
330 return exists_d | 207 return exists_d |
331 | 208 |
332 self._openFileWrite(file_path, transfer_data, file_data, profile) | 209 self._openFileWrite(client, file_path, transfer_data, file_data, stream_object) |
333 return True | 210 return True |
334 | 211 |
335 def getDestDir(self, peer_jid, transfer_data, file_data, profile): | 212 def getDestDir(self, client, peer_jid, transfer_data, file_data, stream_object=False): |
336 """Request confirmation and destination dir to user | 213 """Request confirmation and destination dir to user |
337 | 214 |
338 Overwrite confirmation is managed. | 215 Overwrite confirmation is managed. |
339 if transfer is confirmed, 'file_obj' is added to transfer_data | 216 if transfer is confirmed, 'file_obj' is added to transfer_data |
340 @param peer_jid(jid.JID): jid of the file sender | 217 @param peer_jid(jid.JID): jid of the file sender |
341 @param filename(unicode): name of the file | 218 @param filename(unicode): name of the file |
342 @param transfer_data(dict): data of the transfer session, | 219 @param transfer_data(dict): data of the transfer session, |
343 it will be only used to store the file_obj. | 220 it will be only used to store the file_obj. |
344 "file_obj" key *MUST NOT* exist before using getDestDir | 221 "file_obj" (or "stream_object") key *MUST NOT* exist before using getDestDir |
345 @param file_data(dict): information about the file to be transfered | 222 @param file_data(dict): information about the file to be transfered |
346 It MUST contain the following keys: | 223 It MUST contain the following keys: |
347 - peer_jid (jid.JID): other peer jid | 224 - peer_jid (jid.JID): other peer jid |
348 - name (unicode): name of the file to trasnsfer | 225 - name (unicode): name of the file to trasnsfer |
349 the name must not be empty or contain a "/" character | 226 the name must not be empty or contain a "/" character |
353 It *MUST NOT* contain the "peer" key | 230 It *MUST NOT* contain the "peer" key |
354 It may contain: | 231 It may contain: |
355 - data_cb (callable): method called on each data read/write | 232 - data_cb (callable): method called on each data read/write |
356 "file_path" will be added to this dict once destination selected | 233 "file_path" will be added to this dict once destination selected |
357 "size_human" will also be added with human readable file size | 234 "size_human" will also be added with human readable file size |
358 @param profile: %(doc_profile)s | 235 @param stream_object(bool): if True, a stream_object will be used instead of file_obj |
236 a stream.FileStreamObject will be used | |
359 return (defer.Deferred): True if transfer is accepted | 237 return (defer.Deferred): True if transfer is accepted |
360 """ | 238 """ |
361 filename = file_data['name'] | 239 filename = file_data['name'] |
362 assert filename and not '/' in filename | 240 assert filename and not '/' in filename |
363 assert PROGRESS_ID_KEY in file_data | 241 assert PROGRESS_ID_KEY in file_data |
371 action_extra={'meta_from_jid': peer_jid.full(), | 249 action_extra={'meta_from_jid': peer_jid.full(), |
372 'meta_type': C.META_TYPE_FILE, | 250 'meta_type': C.META_TYPE_FILE, |
373 'meta_progress_id': file_data[PROGRESS_ID_KEY] | 251 'meta_progress_id': file_data[PROGRESS_ID_KEY] |
374 }, | 252 }, |
375 security_limit=SECURITY_LIMIT, | 253 security_limit=SECURITY_LIMIT, |
376 profile=profile) | 254 profile=client.profile) |
377 d.addCallback(self._gotConfirmation, peer_jid, transfer_data, file_data, profile) | 255 d.addCallback(self._gotConfirmation, client, peer_jid, transfer_data, file_data, stream_object) |
378 return d | 256 return d |