Mercurial > libervia-backend
comparison src/plugins/plugin_xep_0096.py @ 1524:7b0fcefd52d4
plugin XEP-0047, XEP-0096: In-Band Bystream plugin cleaning:
- some renaming, comments improvments, etc
- progress callback is no more managed here, as it will be managed by application
- no more file data is used, beside file_obj
- a proper Deferred is used instead of success and error callbacks
- more clean error sending method
plugin XEP-0096 has been updated to handle changes. Its temporarily partially broken though
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 25 Sep 2015 19:19:12 +0200 |
parents | 3265a2639182 |
children | 7cc29634b6ef |
comparison
equal
deleted
inserted
replaced
1523:0209f8d35873 | 1524:7b0fcefd52d4 |
---|---|
24 from twisted.words.xish import domish | 24 from twisted.words.xish import domish |
25 from twisted.words.protocols.jabber import jid | 25 from twisted.words.protocols.jabber import jid |
26 from twisted.words.protocols import jabber | 26 from twisted.words.protocols import jabber |
27 import os | 27 import os |
28 from twisted.internet import reactor | 28 from twisted.internet import reactor |
29 from twisted.python import failure | |
29 | 30 |
30 from wokkel import data_form | 31 from wokkel import data_form |
31 | 32 |
32 IQ_SET = '/iq[@type="set"]' | 33 IQ_SET = '/iq[@type="set"]' |
33 PROFILE_NAME = "file-transfer" | 34 PROFILE_NAME = "file-transfer" |
161 range_offset = file_obj.tell() | 162 range_offset = file_obj.tell() |
162 self.host.plugins["XEP-0065"].prepareToReceive(jid.JID(data['from']), sid, file_obj, int(data["size"]), self._transferSucceeded, self._transferFailed, profile) | 163 self.host.plugins["XEP-0065"].prepareToReceive(jid.JID(data['from']), sid, file_obj, int(data["size"]), self._transferSucceeded, self._transferFailed, profile) |
163 elif stream_method == self.host.plugins["XEP-0047"].NAMESPACE: | 164 elif stream_method == self.host.plugins["XEP-0047"].NAMESPACE: |
164 file_obj = self._getFileObject(dest_path, can_range) | 165 file_obj = self._getFileObject(dest_path, can_range) |
165 range_offset = file_obj.tell() | 166 range_offset = file_obj.tell() |
166 self.host.plugins["XEP-0047"].prepareToReceive(jid.JID(data['from']), sid, file_obj, int(data["size"]), self._transferSucceeded, self._transferFailed, profile) | 167 d = self.host.plugins["XEP-0047"].createSession(file_obj, jid.JID(data['from']), sid, int(data["size"]), profile) |
168 d.addCallback(self._transferSucceeded, sid, file_obj, self.host.plugins["XEP-0047"].NAMESPACE, profile) | |
169 d.addErrback(self._transferFailed, sid, file_obj, self.host.plugins["XEP-0047"].NAMESPACE, profile) | |
167 else: | 170 else: |
168 log.error(_("Unknown stream method, this should not happen at this stage, cancelling transfer")) | 171 log.error(_("Unknown stream method, this should not happen at this stage, cancelling transfer")) |
169 del(client._xep_0096_waiting_for_approval[sid]) | 172 del(client._xep_0096_waiting_for_approval[sid]) |
170 return | 173 return |
171 | 174 |
182 else: | 185 else: |
183 log.debug(_(u"Transfer [%s] refused") % sid) | 186 log.debug(_(u"Transfer [%s] refused") % sid) |
184 self.host.plugins["XEP-0095"].sendRejectedError(data["id"], data['from'], profile=profile) | 187 self.host.plugins["XEP-0095"].sendRejectedError(data["id"], data['from'], profile=profile) |
185 del(client._xep_0096_waiting_for_approval[sid]) | 188 del(client._xep_0096_waiting_for_approval[sid]) |
186 | 189 |
187 def _transferSucceeded(self, sid, file_obj, stream_method, profile): | 190 def _transferSucceeded(self, dummy, sid, file_obj, stream_method, profile): |
191 self.transferSucceeded(sid, file_obj, stream_method, profile) | |
192 | |
193 def transferSucceeded(self, dummy, sid, file_obj, stream_method, profile): | |
188 """Called by the stream method when transfer successfuly finished | 194 """Called by the stream method when transfer successfuly finished |
189 @param id: stream id""" | 195 @param id: stream id""" |
190 client = self.host.getClient(profile) | 196 client = self.host.getClient(profile) |
191 file_obj.close() | 197 file_obj.close() |
192 log.info(_('Transfer %s successfuly finished') % sid) | 198 log.info(_('Transfer %s successfuly finished') % sid) |
193 del(client._xep_0096_waiting_for_approval[sid]) | 199 del(client._xep_0096_waiting_for_approval[sid]) |
194 | 200 |
195 def _transferFailed(self, sid, file_obj, stream_method, reason, profile): | 201 def _transferFailed(self, sid, file_obj, stream_method, reason, profile): |
202 self.transferFailed(failure.Failure(Exception(reason)), sid, file_obj, stream_method, profile) | |
203 | |
204 def transferFailed(self, failure, sid, file_obj, stream_method, profile): | |
196 """Called when something went wrong with the transfer | 205 """Called when something went wrong with the transfer |
206 | |
197 @param id: stream id | 207 @param id: stream id |
198 @param reason: can be TIMEOUT, IO_ERROR, PROTOCOL_ERROR""" | 208 """ |
199 client = self.host.getClient(profile) | 209 client = self.host.getClient(profile) |
200 data, timeout, stream_method, failed_methods = client._xep_0096_waiting_for_approval[sid] | 210 data, timeout, stream_method, failed_methods = client._xep_0096_waiting_for_approval[sid] |
201 log.warning(_(u'Transfer %(id)s failed with stream method %(s_method)s: %(reason)s') % { | 211 log.warning(_(u'Transfer %(id)s failed with stream method %(s_method)s: %(reason)s') % { |
202 'id': sid, | 212 'id': sid, |
203 's_method': stream_method, | 213 's_method': stream_method, |
204 'reason': reason}) | 214 'reason': unicode(failure)}) |
205 filepath = file_obj.name | 215 filepath = file_obj.name |
206 file_obj.close() | 216 file_obj.close() |
207 os.remove(filepath) | 217 os.remove(filepath) |
208 #TODO: session remenber (within a time limit) when a stream method fail, and avoid that stream method with full jid for the rest of the session | 218 #TODO: session remenber (within a time limit) when a stream method fail, and avoid that stream method with full jid for the rest of the session |
209 log.warning(_("All stream methods failed, can't transfer the file")) | 219 log.warning(_("All stream methods failed, can't transfer the file")) |
248 | 258 |
249 if stream_method == self.host.plugins["XEP-0065"].NAMESPACE: | 259 if stream_method == self.host.plugins["XEP-0065"].NAMESPACE: |
250 file_obj = open(filepath, 'r') | 260 file_obj = open(filepath, 'r') |
251 if range_offset: | 261 if range_offset: |
252 file_obj.seek(range_offset) | 262 file_obj.seek(range_offset) |
253 self.host.plugins["XEP-0065"].startStream(file_obj, jid.JID(IQ['from']), sid, range_length, self.sendSuccessCb, self.sendFailureCb, size, profile) | 263 self.host.plugins["XEP-0065"].startStream(file_obj, jid.JID(IQ['from']), sid, range_length, self._sendSuccessCb, self._sendFailureCb, size, profile) |
254 elif stream_method == self.host.plugins["XEP-0047"].NAMESPACE: | 264 elif stream_method == self.host.plugins["XEP-0047"].NAMESPACE: |
255 file_obj = open(filepath, 'r') | 265 file_obj = open(filepath, 'r') |
256 if range_offset: | 266 if range_offset: |
257 file_obj.seek(range_offset) | 267 file_obj.seek(range_offset) |
258 self.host.plugins["XEP-0047"].startStream(file_obj, jid.JID(IQ['from']), sid, range_length, self.sendSuccessCb, self.sendFailureCb, size, profile) | 268 d = self.host.plugins["XEP-0047"].startStream(file_obj, jid.JID(IQ['from']), sid, profile=profile) |
269 d.addCallback(self.sendSuccessCb, sid, file_obj, self.host.plugins["XEP-0047"].NAMESPACE, profile) | |
270 d.addErrback(self.sendFailureCb, sid, file_obj, self.host.plugins["XEP-0047"].NAMESPACE, profile) | |
259 else: | 271 else: |
260 log.warning(_("Invalid stream method received")) | 272 log.warning(_("Invalid stream method received")) |
261 | 273 |
262 def sendFile(self, to_jid, filepath, data={}, profile_key=C.PROF_KEY_NONE): | 274 def sendFile(self, to_jid, filepath, data={}, profile_key=C.PROF_KEY_NONE): |
263 """send a file using XEP-0096 | 275 """send a file using XEP-0096 |
286 | 298 |
287 sid, offer = self.host.plugins["XEP-0095"].proposeStream(jid.JID(to_jid), PROFILE, feature_elt, file_transfer_elts, profile_key=profile) | 299 sid, offer = self.host.plugins["XEP-0095"].proposeStream(jid.JID(to_jid), PROFILE, feature_elt, file_transfer_elts, profile_key=profile) |
288 offer.addCallback(self.fileCb, filepath, sid, size, profile) | 300 offer.addCallback(self.fileCb, filepath, sid, size, profile) |
289 return sid | 301 return sid |
290 | 302 |
291 def sendSuccessCb(self, sid, file_obj, stream_method, profile): | 303 def _sendSuccessCb(self, sid, file_obj, stream_method, profile): |
304 self.sendSuccessCb(sid, file_obj, stream_method, profile) | |
305 | |
306 def sendSuccessCb(self, dummy, sid, file_obj, stream_method, profile): | |
292 log.info(_(u'Transfer %(sid)s successfuly finished [%(profile)s]') | 307 log.info(_(u'Transfer %(sid)s successfuly finished [%(profile)s]') |
293 % {"sid": sid, "profile": profile}) | 308 % {"sid": sid, "profile": profile}) |
294 file_obj.close() | 309 file_obj.close() |
295 | 310 |
296 def sendFailureCb(self, sid, file_obj, stream_method, reason, profile): | 311 def _sendFailureCb(self, sid, file_obj, stream_method, reason, profile): |
312 self.sendFailureCb(failure.Failure(Exception(reason)), sid, file_obj, stream_method, profile) | |
313 | |
314 def sendFailureCb(self, failure, sid, file_obj, stream_method, profile): | |
297 file_obj.close() | 315 file_obj.close() |
298 log.warning(_(u'Transfer %(id)s failed with stream method %(s_method)s: %(reason)s [%(profile)s]') % {'id': sid, "s_method": stream_method, 'reason': reason, 'profile': profile}) | 316 log.warning(_(u'Transfer %(id)s failed with stream method %(s_method)s: %(reason)s [%(profile)s]') % {'id': sid, "s_method": stream_method, 'reason': unicode(failure), 'profile': profile}) |