comparison src/plugins/plugin_xep_0096.py @ 1524:7b0fcefd52d4

plugin XEP-0047, XEP-0096: In-Band Bystream plugin cleaning: - some renaming, comments improvments, etc - progress callback is no more managed here, as it will be managed by application - no more file data is used, beside file_obj - a proper Deferred is used instead of success and error callbacks - more clean error sending method plugin XEP-0096 has been updated to handle changes. Its temporarily partially broken though
author Goffi <goffi@goffi.org>
date Fri, 25 Sep 2015 19:19:12 +0200
parents 3265a2639182
children 7cc29634b6ef
comparison
equal deleted inserted replaced
1523:0209f8d35873 1524:7b0fcefd52d4
24 from twisted.words.xish import domish 24 from twisted.words.xish import domish
25 from twisted.words.protocols.jabber import jid 25 from twisted.words.protocols.jabber import jid
26 from twisted.words.protocols import jabber 26 from twisted.words.protocols import jabber
27 import os 27 import os
28 from twisted.internet import reactor 28 from twisted.internet import reactor
29 from twisted.python import failure
29 30
30 from wokkel import data_form 31 from wokkel import data_form
31 32
32 IQ_SET = '/iq[@type="set"]' 33 IQ_SET = '/iq[@type="set"]'
33 PROFILE_NAME = "file-transfer" 34 PROFILE_NAME = "file-transfer"
161 range_offset = file_obj.tell() 162 range_offset = file_obj.tell()
162 self.host.plugins["XEP-0065"].prepareToReceive(jid.JID(data['from']), sid, file_obj, int(data["size"]), self._transferSucceeded, self._transferFailed, profile) 163 self.host.plugins["XEP-0065"].prepareToReceive(jid.JID(data['from']), sid, file_obj, int(data["size"]), self._transferSucceeded, self._transferFailed, profile)
163 elif stream_method == self.host.plugins["XEP-0047"].NAMESPACE: 164 elif stream_method == self.host.plugins["XEP-0047"].NAMESPACE:
164 file_obj = self._getFileObject(dest_path, can_range) 165 file_obj = self._getFileObject(dest_path, can_range)
165 range_offset = file_obj.tell() 166 range_offset = file_obj.tell()
166 self.host.plugins["XEP-0047"].prepareToReceive(jid.JID(data['from']), sid, file_obj, int(data["size"]), self._transferSucceeded, self._transferFailed, profile) 167 d = self.host.plugins["XEP-0047"].createSession(file_obj, jid.JID(data['from']), sid, int(data["size"]), profile)
168 d.addCallback(self._transferSucceeded, sid, file_obj, self.host.plugins["XEP-0047"].NAMESPACE, profile)
169 d.addErrback(self._transferFailed, sid, file_obj, self.host.plugins["XEP-0047"].NAMESPACE, profile)
167 else: 170 else:
168 log.error(_("Unknown stream method, this should not happen at this stage, cancelling transfer")) 171 log.error(_("Unknown stream method, this should not happen at this stage, cancelling transfer"))
169 del(client._xep_0096_waiting_for_approval[sid]) 172 del(client._xep_0096_waiting_for_approval[sid])
170 return 173 return
171 174
182 else: 185 else:
183 log.debug(_(u"Transfer [%s] refused") % sid) 186 log.debug(_(u"Transfer [%s] refused") % sid)
184 self.host.plugins["XEP-0095"].sendRejectedError(data["id"], data['from'], profile=profile) 187 self.host.plugins["XEP-0095"].sendRejectedError(data["id"], data['from'], profile=profile)
185 del(client._xep_0096_waiting_for_approval[sid]) 188 del(client._xep_0096_waiting_for_approval[sid])
186 189
187 def _transferSucceeded(self, sid, file_obj, stream_method, profile): 190 def _transferSucceeded(self, dummy, sid, file_obj, stream_method, profile):
191 self.transferSucceeded(sid, file_obj, stream_method, profile)
192
193 def transferSucceeded(self, dummy, sid, file_obj, stream_method, profile):
188 """Called by the stream method when transfer successfuly finished 194 """Called by the stream method when transfer successfuly finished
189 @param id: stream id""" 195 @param id: stream id"""
190 client = self.host.getClient(profile) 196 client = self.host.getClient(profile)
191 file_obj.close() 197 file_obj.close()
192 log.info(_('Transfer %s successfuly finished') % sid) 198 log.info(_('Transfer %s successfuly finished') % sid)
193 del(client._xep_0096_waiting_for_approval[sid]) 199 del(client._xep_0096_waiting_for_approval[sid])
194 200
195 def _transferFailed(self, sid, file_obj, stream_method, reason, profile): 201 def _transferFailed(self, sid, file_obj, stream_method, reason, profile):
202 self.transferFailed(failure.Failure(Exception(reason)), sid, file_obj, stream_method, profile)
203
204 def transferFailed(self, failure, sid, file_obj, stream_method, profile):
196 """Called when something went wrong with the transfer 205 """Called when something went wrong with the transfer
206
197 @param id: stream id 207 @param id: stream id
198 @param reason: can be TIMEOUT, IO_ERROR, PROTOCOL_ERROR""" 208 """
199 client = self.host.getClient(profile) 209 client = self.host.getClient(profile)
200 data, timeout, stream_method, failed_methods = client._xep_0096_waiting_for_approval[sid] 210 data, timeout, stream_method, failed_methods = client._xep_0096_waiting_for_approval[sid]
201 log.warning(_(u'Transfer %(id)s failed with stream method %(s_method)s: %(reason)s') % { 211 log.warning(_(u'Transfer %(id)s failed with stream method %(s_method)s: %(reason)s') % {
202 'id': sid, 212 'id': sid,
203 's_method': stream_method, 213 's_method': stream_method,
204 'reason': reason}) 214 'reason': unicode(failure)})
205 filepath = file_obj.name 215 filepath = file_obj.name
206 file_obj.close() 216 file_obj.close()
207 os.remove(filepath) 217 os.remove(filepath)
208 #TODO: session remenber (within a time limit) when a stream method fail, and avoid that stream method with full jid for the rest of the session 218 #TODO: session remenber (within a time limit) when a stream method fail, and avoid that stream method with full jid for the rest of the session
209 log.warning(_("All stream methods failed, can't transfer the file")) 219 log.warning(_("All stream methods failed, can't transfer the file"))
248 258
249 if stream_method == self.host.plugins["XEP-0065"].NAMESPACE: 259 if stream_method == self.host.plugins["XEP-0065"].NAMESPACE:
250 file_obj = open(filepath, 'r') 260 file_obj = open(filepath, 'r')
251 if range_offset: 261 if range_offset:
252 file_obj.seek(range_offset) 262 file_obj.seek(range_offset)
253 self.host.plugins["XEP-0065"].startStream(file_obj, jid.JID(IQ['from']), sid, range_length, self.sendSuccessCb, self.sendFailureCb, size, profile) 263 self.host.plugins["XEP-0065"].startStream(file_obj, jid.JID(IQ['from']), sid, range_length, self._sendSuccessCb, self._sendFailureCb, size, profile)
254 elif stream_method == self.host.plugins["XEP-0047"].NAMESPACE: 264 elif stream_method == self.host.plugins["XEP-0047"].NAMESPACE:
255 file_obj = open(filepath, 'r') 265 file_obj = open(filepath, 'r')
256 if range_offset: 266 if range_offset:
257 file_obj.seek(range_offset) 267 file_obj.seek(range_offset)
258 self.host.plugins["XEP-0047"].startStream(file_obj, jid.JID(IQ['from']), sid, range_length, self.sendSuccessCb, self.sendFailureCb, size, profile) 268 d = self.host.plugins["XEP-0047"].startStream(file_obj, jid.JID(IQ['from']), sid, profile=profile)
269 d.addCallback(self.sendSuccessCb, sid, file_obj, self.host.plugins["XEP-0047"].NAMESPACE, profile)
270 d.addErrback(self.sendFailureCb, sid, file_obj, self.host.plugins["XEP-0047"].NAMESPACE, profile)
259 else: 271 else:
260 log.warning(_("Invalid stream method received")) 272 log.warning(_("Invalid stream method received"))
261 273
262 def sendFile(self, to_jid, filepath, data={}, profile_key=C.PROF_KEY_NONE): 274 def sendFile(self, to_jid, filepath, data={}, profile_key=C.PROF_KEY_NONE):
263 """send a file using XEP-0096 275 """send a file using XEP-0096
286 298
287 sid, offer = self.host.plugins["XEP-0095"].proposeStream(jid.JID(to_jid), PROFILE, feature_elt, file_transfer_elts, profile_key=profile) 299 sid, offer = self.host.plugins["XEP-0095"].proposeStream(jid.JID(to_jid), PROFILE, feature_elt, file_transfer_elts, profile_key=profile)
288 offer.addCallback(self.fileCb, filepath, sid, size, profile) 300 offer.addCallback(self.fileCb, filepath, sid, size, profile)
289 return sid 301 return sid
290 302
291 def sendSuccessCb(self, sid, file_obj, stream_method, profile): 303 def _sendSuccessCb(self, sid, file_obj, stream_method, profile):
304 self.sendSuccessCb(sid, file_obj, stream_method, profile)
305
306 def sendSuccessCb(self, dummy, sid, file_obj, stream_method, profile):
292 log.info(_(u'Transfer %(sid)s successfuly finished [%(profile)s]') 307 log.info(_(u'Transfer %(sid)s successfuly finished [%(profile)s]')
293 % {"sid": sid, "profile": profile}) 308 % {"sid": sid, "profile": profile})
294 file_obj.close() 309 file_obj.close()
295 310
296 def sendFailureCb(self, sid, file_obj, stream_method, reason, profile): 311 def _sendFailureCb(self, sid, file_obj, stream_method, reason, profile):
312 self.sendFailureCb(failure.Failure(Exception(reason)), sid, file_obj, stream_method, profile)
313
314 def sendFailureCb(self, failure, sid, file_obj, stream_method, profile):
297 file_obj.close() 315 file_obj.close()
298 log.warning(_(u'Transfer %(id)s failed with stream method %(s_method)s: %(reason)s [%(profile)s]') % {'id': sid, "s_method": stream_method, 'reason': reason, 'profile': profile}) 316 log.warning(_(u'Transfer %(id)s failed with stream method %(s_method)s: %(reason)s [%(profile)s]') % {'id': sid, "s_method": stream_method, 'reason': unicode(failure), 'profile': profile})