comparison src/plugins/plugin_misc_file.py @ 2489:e2a7bb875957

plugin pipe/stream, file transfert: refactoring and improvments: this is a big patch as things had to be changed at the same time. - changed methods using profile argument to use client instead - move SatFile in a new tools.stream module, has it should be part of core, not a plugin - new IStreamProducer interface, to handler starting a pull producer - new FileStreamObject which create a stream producer/consumer from a SatFile - plugin pipe is no more using unix named pipe, as it complicate the thing, special care need to be taken to not block, and it's generally not necessary. Instead a socket is now used, so the plugin has been renomed to jingle stream. - bad connection/error should be better handler in jingle stream plugin, and code should not block anymore - jp pipe commands have been updated accordingly fix bug 237
author Goffi <goffi@goffi.org>
date Thu, 08 Feb 2018 00:37:42 +0100
parents 0046283a285d
children 7ad5f2c4e34a
comparison
equal deleted inserted replaced
2488:78c7992a26ed 2489:e2a7bb875957
21 from sat.core.constants import Const as C 21 from sat.core.constants import Const as C
22 from sat.core.log import getLogger 22 from sat.core.log import getLogger
23 log = getLogger(__name__) 23 log = getLogger(__name__)
24 from sat.core import exceptions 24 from sat.core import exceptions
25 from sat.tools import xml_tools 25 from sat.tools import xml_tools
26 from sat.tools import stream
26 from twisted.internet import defer 27 from twisted.internet import defer
27 from twisted.words.protocols.jabber import jid 28 from twisted.words.protocols.jabber import jid
28 import os 29 import os
29 import os.path 30 import os.path
30 import uuid
31 31
32 32
33 PLUGIN_INFO = { 33 PLUGIN_INFO = {
34 C.PI_NAME: "File Tansfer", 34 C.PI_NAME: "File Tansfer",
35 C.PI_IMPORT_NAME: "FILE", 35 C.PI_IMPORT_NAME: "FILE",
50 SECURITY_LIMIT = 30 50 SECURITY_LIMIT = 30
51 51
52 PROGRESS_ID_KEY = 'progress_id' 52 PROGRESS_ID_KEY = 'progress_id'
53 53
54 54
55 class SatFile(object):
56 """A file-like object to have high level files manipulation"""
57 # TODO: manage "with" statement
58
59 def __init__(self, host, path, mode='rb', uid=None, size=None, data_cb=None, auto_end_signals=True, profile=C.PROF_KEY_NONE):
60 """
61 @param host: %(doc_host)s
62 @param path(str): path of the file to get
63 @param mode(str): same as for built-in "open" function
64 @param uid(unicode, None): unique id identifing this progressing element
65 This uid will be used with self.host.progressGet
66 will be automaticaly generated if None
67 @param size(None, int): size of the file
68 @param data_cb(None, callable): method to call on each data read/write
69 mainly useful to do things like calculating hash
70 @param auto_end_signals(bool): if True, progressFinished and progressError signals are automatically sent
71 if False, you'll have to call self.progressFinished and self.progressError yourself
72 progressStarted signal is always sent automatically
73 """
74 self.host = host
75 self.uid = uid or unicode(uuid.uuid4())
76 self._file = open(path, mode)
77 self.size = size
78 self.data_cb = data_cb
79 self.profile = profile
80 self.auto_end_signals = auto_end_signals
81 metadata = self.getProgressMetadata()
82 self.host.registerProgressCb(self.uid, self.getProgress, metadata, profile=profile)
83 self.host.bridge.progressStarted(self.uid, metadata, self.profile)
84
85 def checkSize(self):
86 """Check that current size correspond to given size
87
88 must be used when the transfer is supposed to be finished
89 @return (bool): True if the position is the same as given size
90 @raise exceptions.NotFound: size has not be specified
91 """
92 position = self._file.tell()
93 if self.size is None:
94 raise exceptions.NotFound
95 return position == self.size
96
97
98 def close(self, progress_metadata=None, error=None):
99 """Close the current file
100
101 @param progress_metadata(None, dict): metadata to send with _onProgressFinished message
102 @param error(None, unicode): set to an error message if progress was not successful
103 mutually exclusive with progress_metadata
104 error can happen even if error is None, if current size differ from given size
105 """
106 if self._file.closed:
107 return # avoid double close (which is allowed) error
108 if error is None:
109 try:
110 size_ok = self.checkSize()
111 except exceptions.NotFound:
112 size_ok = True
113 if not size_ok:
114 error = u'declared and actual size mismatch'
115 log.warning(error)
116 progress_metadata = None
117
118 self._file.close()
119
120 if self.auto_end_signals:
121 if error is None:
122 self.progressFinished(progress_metadata)
123 else:
124 assert progress_metadata is None
125 self.progressError(error)
126
127 self.host.removeProgressCb(self.uid, self.profile)
128
129 def progressFinished(self, metadata=None):
130 if metadata is None:
131 metadata = {}
132 self.host.bridge.progressFinished(self.uid, metadata, self.profile)
133
134 def progressError(self, error):
135 self.host.bridge.progressError(self.uid, error, self.profile)
136
137 def flush(self):
138 self._file.flush()
139
140 def write(self, buf):
141 self._file.write(buf)
142 if self.data_cb is not None:
143 return self.data_cb(buf)
144
145 def read(self, size=-1):
146 read = self._file.read(size)
147 if self.data_cb is not None and read:
148 self.data_cb(read)
149 return read
150
151 def seek(self, offset, whence=os.SEEK_SET):
152 self._file.seek(offset, whence)
153
154 def tell(self):
155 return self._file.tell()
156
157 def mode(self):
158 return self._file.mode()
159
160 def getProgressMetadata(self):
161 """Return progression metadata as given to progressStarted
162
163 @return (dict): metadata (check bridge for documentation)
164 """
165 metadata = {'type': C.META_TYPE_FILE}
166
167 mode = self._file.mode
168 if '+' in mode:
169 pass # we have no direction in read/write modes
170 elif mode in ('r', 'rb'):
171 metadata['direction'] = 'out'
172 elif mode in ('w', 'wb'):
173 metadata['direction'] = 'in'
174 elif 'U' in mode:
175 metadata['direction'] = 'out'
176 else:
177 raise exceptions.InternalError
178
179 metadata['name'] = self._file.name
180
181 return metadata
182
183 def getProgress(self, progress_id, profile):
184 ret = {'position': self._file.tell()}
185 if self.size:
186 ret['size'] = self.size
187 return ret
188
189
190 class FilePlugin(object): 55 class FilePlugin(object):
191 File=SatFile 56 File=stream.SatFile
192 57
193 def __init__(self, host): 58 def __init__(self, host):
194 log.info(_("plugin File initialization")) 59 log.info(_("plugin File initialization"))
195 self.host = host 60 self.host = host
196 host.bridge.addMethod("fileSend", ".plugin", in_sign='sssss', out_sign='a{ss}', method=self._fileSend, async=True) 61 host.bridge.addMethod("fileSend", ".plugin", in_sign='sssss', out_sign='a{ss}', method=self._fileSend, async=True)
197 self._file_callbacks = [] 62 self._file_callbacks = []
198 host.importMenu((D_("Action"), D_("send file")), self._fileSendMenu, security_limit=10, help_string=D_("Send a file"), type_=C.MENU_SINGLE) 63 host.importMenu((D_("Action"), D_("send file")), self._fileSendMenu, security_limit=10, help_string=D_("Send a file"), type_=C.MENU_SINGLE)
199 64
200 def _fileSend(self, peer_jid_s, filepath, name="", file_desc="", profile=C.PROF_KEY_NONE): 65 def _fileSend(self, peer_jid_s, filepath, name="", file_desc="", profile=C.PROF_KEY_NONE):
201 return self.fileSend(jid.JID(peer_jid_s), filepath, name or None, file_desc or None, profile) 66 client = self.host.getClient(profile)
67 return self.fileSend(client, jid.JID(peer_jid_s), filepath, name or None, file_desc or None)
202 68
203 @defer.inlineCallbacks 69 @defer.inlineCallbacks
204 def fileSend(self, peer_jid, filepath, filename=None, file_desc=None, profile=C.PROF_KEY_NONE): 70 def fileSend(self, client, peer_jid, filepath, filename=None, file_desc=None):
205 """Send a file using best available method 71 """Send a file using best available method
206 72
207 @param peer_jid(jid.JID): jid of the destinee 73 @param peer_jid(jid.JID): jid of the destinee
208 @param filepath(str): absolute path to the file 74 @param filepath(str): absolute path to the file
209 @param filename(unicode, None): name to use, or None to find it from filepath 75 @param filename(unicode, None): name to use, or None to find it from filepath
210 @param file_desc(unicode, None): description of the file 76 @param file_desc(unicode, None): description of the file
211 @param profile: %(doc_profile)s 77 @param profile: %(doc_profile)s
212 @return (dict): action dictionary, with progress id in case of success, else xmlui message 78 @return (dict): action dictionary, with progress id in case of success, else xmlui message
213 """ 79 """
214 client = self.host.getClient(profile)
215 if not os.path.isfile(filepath): 80 if not os.path.isfile(filepath):
216 raise exceptions.DataError(u"The given path doesn't link to a file") 81 raise exceptions.DataError(u"The given path doesn't link to a file")
217 if not filename: 82 if not filename:
218 filename = os.path.basename(filepath) or '_' 83 filename = os.path.basename(filepath) or '_'
219 for namespace, callback, priority, method_name in self._file_callbacks: 84 for namespace, callback, priority, method_name in self._file_callbacks:
220 has_feature = yield self.host.hasFeature(client, namespace, peer_jid) 85 has_feature = yield self.host.hasFeature(client, namespace, peer_jid)
221 if has_feature: 86 if has_feature:
222 log.info(u"{name} method will be used to send the file".format(name=method_name)) 87 log.info(u"{name} method will be used to send the file".format(name=method_name))
223 progress_id = yield callback(peer_jid, filepath, filename, file_desc, profile) 88 progress_id = yield callback(client, peer_jid, filepath, filename, file_desc)
224 defer.returnValue({'progress': progress_id}) 89 defer.returnValue({'progress': progress_id})
225 msg = u"Can't find any method to send file to {jid}".format(jid=peer_jid.full()) 90 msg = u"Can't find any method to send file to {jid}".format(jid=peer_jid.full())
226 log.warning(msg) 91 log.warning(msg)
227 defer.returnValue({'xmlui': xml_tools.note(u"Can't transfer file", msg, C.XMLUI_DATA_LVL_WARNING).toXml()}) 92 defer.returnValue({'xmlui': xml_tools.note(u"Can't transfer file", msg, C.XMLUI_DATA_LVL_WARNING).toXml()})
228 93
229 def _onFileChoosed(self, peer_jid, data, profile): 94 def _onFileChoosed(self, client, peer_jid, data):
230 cancelled = C.bool(data.get("cancelled", C.BOOL_FALSE)) 95 cancelled = C.bool(data.get("cancelled", C.BOOL_FALSE))
231 if cancelled: 96 if cancelled:
232 return 97 return
233 path=data['path'] 98 path=data['path']
234 return self.fileSend(peer_jid, path, profile=profile) 99 return self.fileSend(client, peer_jid, path)
235 100
236 def _fileSendMenu(self, data, profile): 101 def _fileSendMenu(self, data, profile):
237 """ XMLUI activated by menu: return file sending UI 102 """ XMLUI activated by menu: return file sending UI
238 103
239 @param profile: %(doc_profile)s 104 @param profile: %(doc_profile)s
241 try: 106 try:
242 jid_ = jid.JID(data['jid']) 107 jid_ = jid.JID(data['jid'])
243 except RuntimeError: 108 except RuntimeError:
244 raise exceptions.DataError(_("Invalid JID")) 109 raise exceptions.DataError(_("Invalid JID"))
245 110
246 file_choosed_id = self.host.registerCallback(lambda data, profile: self._onFileChoosed(jid_, data, profile), with_data=True, one_shot=True) 111 file_choosed_id = self.host.registerCallback(lambda data, profile: self._onFileChoosed(self.host.getClient(profile), jid_, data), with_data=True, one_shot=True)
247 xml_ui = xml_tools.XMLUI( 112 xml_ui = xml_tools.XMLUI(
248 C.XMLUI_DIALOG, 113 C.XMLUI_DIALOG,
249 dialog_opt = { 114 dialog_opt = {
250 C.XMLUI_DATA_TYPE: C.XMLUI_DIALOG_FILE, 115 C.XMLUI_DATA_TYPE: C.XMLUI_DIALOG_FILE,
251 C.XMLUI_DATA_MESS: _(SENDING).format(peer=jid_.full())}, 116 C.XMLUI_DATA_MESS: _(SENDING).format(peer=jid_.full())},
276 raise exceptions.NotFound(u"The namespace to unregister doesn't exist") 141 raise exceptions.NotFound(u"The namespace to unregister doesn't exist")
277 142
278 # Dialogs with user 143 # Dialogs with user
279 # the overwrite check is done here 144 # the overwrite check is done here
280 145
281 def _openFileWrite(self, file_path, transfer_data, file_data, profile): 146 def _openFileWrite(self, client, file_path, transfer_data, file_data, stream_object):
282 assert 'file_obj' not in transfer_data 147 if stream_object:
283 transfer_data['file_obj'] = SatFile( 148 assert 'stream_object' not in transfer_data
284 self.host, 149 transfer_data['stream_object'] = stream.FileStreamObject(
285 file_path, 150 self.host,
286 'wb', 151 client,
287 uid=file_data[PROGRESS_ID_KEY], 152 file_path,
288 size=file_data['size'], 153 mode='wb',
289 data_cb = file_data.get('data_cb'), 154 uid=file_data[PROGRESS_ID_KEY],
290 profile=profile, 155 size=file_data['size'],
291 ) 156 data_cb = file_data.get('data_cb'),
292 157 )
293 def _gotConfirmation(self, data, peer_jid, transfer_data, file_data, profile): 158 else:
159 assert 'file_obj' not in transfer_data
160 transfer_data['file_obj'] = stream.SatFile(
161 self.host,
162 client,
163 file_path,
164 mode='wb',
165 uid=file_data[PROGRESS_ID_KEY],
166 size=file_data['size'],
167 data_cb = file_data.get('data_cb'),
168 )
169
170 def _gotConfirmation(self, data, client, peer_jid, transfer_data, file_data, stream_object):
294 """Called when the permission and dest path have been received 171 """Called when the permission and dest path have been received
295 172
296 @param peer_jid(jid.JID): jid of the file sender 173 @param peer_jid(jid.JID): jid of the file sender
297 @param transfer_data(dict): same as for [self.getDestDir] 174 @param transfer_data(dict): same as for [self.getDestDir]
298 @param file_data(dict): same as for [self.getDestDir] 175 @param file_data(dict): same as for [self.getDestDir]
299 @param profile: %(doc_profile)s 176 @param stream_object(bool): same as for [self.getDestDir]
300 return (bool): True if copy is wanted and OK 177 return (bool): True if copy is wanted and OK
301 False if user wants to cancel 178 False if user wants to cancel
302 if file exists ask confirmation and call again self._getDestDir if needed 179 if file exists ask confirmation and call again self._getDestDir if needed
303 """ 180 """
304 if data.get('cancelled', False): 181 if data.get('cancelled', False):
309 186
310 # we manage case where file already exists 187 # we manage case where file already exists
311 if os.path.exists(file_path): 188 if os.path.exists(file_path):
312 def check_overwrite(overwrite): 189 def check_overwrite(overwrite):
313 if overwrite: 190 if overwrite:
314 self._openFileWrite(file_path, transfer_data, file_data, profile) 191 self._openFileWrite(client, file_path, transfer_data, file_data, stream_object)
315 return True 192 return True
316 else: 193 else:
317 return self.getDestDir(peer_jid, transfer_data, file_data, profile) 194 return self.getDestDir(client, peer_jid, transfer_data, file_data)
318 195
319 exists_d = xml_tools.deferConfirm( 196 exists_d = xml_tools.deferConfirm(
320 self.host, 197 self.host,
321 _(CONFIRM_OVERWRITE).format(file_path), 198 _(CONFIRM_OVERWRITE).format(file_path),
322 _(CONFIRM_OVERWRITE_TITLE), 199 _(CONFIRM_OVERWRITE_TITLE),
323 action_extra={'meta_from_jid': peer_jid.full(), 200 action_extra={'meta_from_jid': peer_jid.full(),
324 'meta_type': C.META_TYPE_OVERWRITE, 201 'meta_type': C.META_TYPE_OVERWRITE,
325 'meta_progress_id': file_data[PROGRESS_ID_KEY] 202 'meta_progress_id': file_data[PROGRESS_ID_KEY]
326 }, 203 },
327 security_limit=SECURITY_LIMIT, 204 security_limit=SECURITY_LIMIT,
328 profile=profile) 205 profile=client.profile)
329 exists_d.addCallback(check_overwrite) 206 exists_d.addCallback(check_overwrite)
330 return exists_d 207 return exists_d
331 208
332 self._openFileWrite(file_path, transfer_data, file_data, profile) 209 self._openFileWrite(client, file_path, transfer_data, file_data, stream_object)
333 return True 210 return True
334 211
335 def getDestDir(self, peer_jid, transfer_data, file_data, profile): 212 def getDestDir(self, client, peer_jid, transfer_data, file_data, stream_object=False):
336 """Request confirmation and destination dir to user 213 """Request confirmation and destination dir to user
337 214
338 Overwrite confirmation is managed. 215 Overwrite confirmation is managed.
339 if transfer is confirmed, 'file_obj' is added to transfer_data 216 if transfer is confirmed, 'file_obj' is added to transfer_data
340 @param peer_jid(jid.JID): jid of the file sender 217 @param peer_jid(jid.JID): jid of the file sender
341 @param filename(unicode): name of the file 218 @param filename(unicode): name of the file
342 @param transfer_data(dict): data of the transfer session, 219 @param transfer_data(dict): data of the transfer session,
343 it will be only used to store the file_obj. 220 it will be only used to store the file_obj.
344 "file_obj" key *MUST NOT* exist before using getDestDir 221 "file_obj" (or "stream_object") key *MUST NOT* exist before using getDestDir
345 @param file_data(dict): information about the file to be transfered 222 @param file_data(dict): information about the file to be transfered
346 It MUST contain the following keys: 223 It MUST contain the following keys:
347 - peer_jid (jid.JID): other peer jid 224 - peer_jid (jid.JID): other peer jid
348 - name (unicode): name of the file to trasnsfer 225 - name (unicode): name of the file to trasnsfer
349 the name must not be empty or contain a "/" character 226 the name must not be empty or contain a "/" character
353 It *MUST NOT* contain the "peer" key 230 It *MUST NOT* contain the "peer" key
354 It may contain: 231 It may contain:
355 - data_cb (callable): method called on each data read/write 232 - data_cb (callable): method called on each data read/write
356 "file_path" will be added to this dict once destination selected 233 "file_path" will be added to this dict once destination selected
357 "size_human" will also be added with human readable file size 234 "size_human" will also be added with human readable file size
358 @param profile: %(doc_profile)s 235 @param stream_object(bool): if True, a stream_object will be used instead of file_obj
236 a stream.FileStreamObject will be used
359 return (defer.Deferred): True if transfer is accepted 237 return (defer.Deferred): True if transfer is accepted
360 """ 238 """
361 filename = file_data['name'] 239 filename = file_data['name']
362 assert filename and not '/' in filename 240 assert filename and not '/' in filename
363 assert PROGRESS_ID_KEY in file_data 241 assert PROGRESS_ID_KEY in file_data
371 action_extra={'meta_from_jid': peer_jid.full(), 249 action_extra={'meta_from_jid': peer_jid.full(),
372 'meta_type': C.META_TYPE_FILE, 250 'meta_type': C.META_TYPE_FILE,
373 'meta_progress_id': file_data[PROGRESS_ID_KEY] 251 'meta_progress_id': file_data[PROGRESS_ID_KEY]
374 }, 252 },
375 security_limit=SECURITY_LIMIT, 253 security_limit=SECURITY_LIMIT,
376 profile=profile) 254 profile=client.profile)
377 d.addCallback(self._gotConfirmation, peer_jid, transfer_data, file_data, profile) 255 d.addCallback(self._gotConfirmation, client, peer_jid, transfer_data, file_data, stream_object)
378 return d 256 return d