Mercurial > libervia-backend
comparison src/plugins/plugin_xep_0096.py @ 2489:e2a7bb875957
plugin pipe/stream, file transfert: refactoring and improvments:
this is a big patch as things had to be changed at the same time.
- changed methods using profile argument to use client instead
- move SatFile in a new tools.stream module, has it should be part of core, not a plugin
- new IStreamProducer interface, to handler starting a pull producer
- new FileStreamObject which create a stream producer/consumer from a SatFile
- plugin pipe is no more using unix named pipe, as it complicate the thing,
special care need to be taken to not block, and it's generally not necessary.
Instead a socket is now used, so the plugin has been renomed to jingle stream.
- bad connection/error should be better handler in jingle stream plugin, and code should not block anymore
- jp pipe commands have been updated accordingly
fix bug 237
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 08 Feb 2018 00:37:42 +0100 |
parents | 0046283a285d |
children | 7ad5f2c4e34a |
comparison
equal
deleted
inserted
replaced
2488:78c7992a26ed | 2489:e2a7bb875957 |
---|---|
21 from sat.core.constants import Const as C | 21 from sat.core.constants import Const as C |
22 from sat.core.log import getLogger | 22 from sat.core.log import getLogger |
23 log = getLogger(__name__) | 23 log = getLogger(__name__) |
24 from sat.core import exceptions | 24 from sat.core import exceptions |
25 from sat.tools import xml_tools | 25 from sat.tools import xml_tools |
26 from sat.tools import stream | |
26 from twisted.words.xish import domish | 27 from twisted.words.xish import domish |
27 from twisted.words.protocols.jabber import jid | 28 from twisted.words.protocols.jabber import jid |
28 from twisted.words.protocols.jabber import error | 29 from twisted.words.protocols.jabber import error |
29 import os | 30 import os |
30 | 31 |
61 host.bridge.addMethod("siSendFile", ".plugin", in_sign='sssss', out_sign='s', method=self._sendFile) | 62 host.bridge.addMethod("siSendFile", ".plugin", in_sign='sssss', out_sign='s', method=self._sendFile) |
62 | 63 |
63 def unload(self): | 64 def unload(self): |
64 self._si.unregisterSIProfile(SI_PROFILE_NAME) | 65 self._si.unregisterSIProfile(SI_PROFILE_NAME) |
65 | 66 |
66 def _badRequest(self, iq_elt, message=None, profile=C.PROF_KEY_NONE): | 67 def _badRequest(self, client, iq_elt, message=None): |
67 """Send a bad-request error | 68 """Send a bad-request error |
68 | 69 |
69 @param iq_elt(domish.Element): initial <IQ> element of the SI request | 70 @param iq_elt(domish.Element): initial <IQ> element of the SI request |
70 @param message(None, unicode): informational message to display in the logs | 71 @param message(None, unicode): informational message to display in the logs |
71 @param profile: %(doc_profile)s | |
72 """ | 72 """ |
73 if message is not None: | 73 if message is not None: |
74 log.warning(message) | 74 log.warning(message) |
75 self._si.sendError(iq_elt, 'bad-request', profile) | 75 self._si.sendError(client, iq_elt, 'bad-request') |
76 | 76 |
77 def _parseRange(self, parent_elt, file_size): | 77 def _parseRange(self, parent_elt, file_size): |
78 """find and parse <range/> element | 78 """find and parse <range/> element |
79 | 79 |
80 @param parent_elt(domish.Element): direct parent of the <range/> element | 80 @param parent_elt(domish.Element): direct parent of the <range/> element |
105 if range_offset != 0 or range_length != file_size: | 105 if range_offset != 0 or range_length != file_size: |
106 raise NotImplementedError # FIXME | 106 raise NotImplementedError # FIXME |
107 | 107 |
108 return range_, range_offset, range_length | 108 return range_, range_offset, range_length |
109 | 109 |
110 def _transferRequest(self, iq_elt, si_id, si_mime_type, si_elt, profile): | 110 def _transferRequest(self, client, iq_elt, si_id, si_mime_type, si_elt): |
111 """Called when a file transfer is requested | 111 """Called when a file transfer is requested |
112 | 112 |
113 @param iq_elt(domish.Element): initial <IQ> element of the SI request | 113 @param iq_elt(domish.Element): initial <IQ> element of the SI request |
114 @param si_id(unicode): Stream Initiation session id | 114 @param si_id(unicode): Stream Initiation session id |
115 @param si_mime_type("unicode"): Mime type of the file (or default "application/octet-stream" if unknown) | 115 @param si_mime_type("unicode"): Mime type of the file (or default "application/octet-stream" if unknown) |
116 @param si_elt(domish.Element): request | 116 @param si_elt(domish.Element): request |
117 @param profile: %(doc_profile)s | |
118 """ | 117 """ |
119 log.info(_("XEP-0096 file transfer requested")) | 118 log.info(_("XEP-0096 file transfer requested")) |
120 peer_jid = jid.JID(iq_elt['from']) | 119 peer_jid = jid.JID(iq_elt['from']) |
121 | 120 |
122 try: | 121 try: |
123 file_elt = si_elt.elements(NS_SI_FT, "file").next() | 122 file_elt = si_elt.elements(NS_SI_FT, "file").next() |
124 except StopIteration: | 123 except StopIteration: |
125 return self._badRequest(iq_elt, "No <file/> element found in SI File Transfer request", profile) | 124 return self._badRequest(client, iq_elt, "No <file/> element found in SI File Transfer request") |
126 | 125 |
127 try: | 126 try: |
128 feature_elt = self.host.plugins["XEP-0020"].getFeatureElt(si_elt) | 127 feature_elt = self.host.plugins["XEP-0020"].getFeatureElt(si_elt) |
129 except exceptions.NotFound: | 128 except exceptions.NotFound: |
130 return self._badRequest(iq_elt, "No <feature/> element found in SI File Transfer request", profile) | 129 return self._badRequest(client, iq_elt, "No <feature/> element found in SI File Transfer request") |
131 | 130 |
132 try: | 131 try: |
133 filename = file_elt["name"] | 132 filename = file_elt["name"] |
134 file_size = int(file_elt["size"]) | 133 file_size = int(file_elt["size"]) |
135 except (KeyError, ValueError): | 134 except (KeyError, ValueError): |
136 return self._badRequest(iq_elt, "Malformed SI File Transfer request", profile) | 135 return self._badRequest(client, iq_elt, "Malformed SI File Transfer request") |
137 | 136 |
138 file_date = file_elt.getAttribute("date") | 137 file_date = file_elt.getAttribute("date") |
139 file_hash = file_elt.getAttribute("hash") | 138 file_hash = file_elt.getAttribute("hash") |
140 | 139 |
141 log.info(u"File proposed: name=[{name}] size={size}".format(name=filename, size=file_size)) | 140 log.info(u"File proposed: name=[{name}] size={size}".format(name=filename, size=file_size)) |
146 file_desc = '' | 145 file_desc = '' |
147 | 146 |
148 try: | 147 try: |
149 range_, range_offset, range_length = self._parseRange(file_elt, file_size) | 148 range_, range_offset, range_length = self._parseRange(file_elt, file_size) |
150 except ValueError: | 149 except ValueError: |
151 return self._badRequest(iq_elt, "Malformed SI File Transfer request", profile) | 150 return self._badRequest(client, iq_elt, "Malformed SI File Transfer request") |
152 | 151 |
153 try: | 152 try: |
154 stream_method = self.host.plugins["XEP-0020"].negotiate(feature_elt, 'stream-method', self.managed_stream_m, namespace=None) | 153 stream_method = self.host.plugins["XEP-0020"].negotiate(feature_elt, 'stream-method', self.managed_stream_m, namespace=None) |
155 except KeyError: | 154 except KeyError: |
156 return self._badRequest(iq_elt, "No stream method found", profile) | 155 return self._badRequest(client, iq_elt, "No stream method found") |
157 | 156 |
158 if stream_method: | 157 if stream_method: |
159 if stream_method == self.host.plugins["XEP-0065"].NAMESPACE: | 158 if stream_method == self.host.plugins["XEP-0065"].NAMESPACE: |
160 plugin = self.host.plugins["XEP-0065"] | 159 plugin = self.host.plugins["XEP-0065"] |
161 elif stream_method == self.host.plugins["XEP-0047"].NAMESPACE: | 160 elif stream_method == self.host.plugins["XEP-0047"].NAMESPACE: |
162 plugin = self.host.plugins["XEP-0047"] | 161 plugin = self.host.plugins["XEP-0047"] |
163 else: | 162 else: |
164 log.error(u"Unknown stream method, this should not happen at this stage, cancelling transfer") | 163 log.error(u"Unknown stream method, this should not happen at this stage, cancelling transfer") |
165 else: | 164 else: |
166 log.warning(u"Can't find a valid stream method") | 165 log.warning(u"Can't find a valid stream method") |
167 self._si.sendError(iq_elt, 'not-acceptable', profile) | 166 self._si.sendError(client, iq_elt, 'not-acceptable') |
168 return | 167 return |
169 | 168 |
170 #if we are here, the transfer can start, we just need user's agreement | 169 #if we are here, the transfer can start, we just need user's agreement |
171 data = {"name": filename, "peer_jid": peer_jid, "size": file_size, "date": file_date, "hash": file_hash, "desc": file_desc, | 170 data = {"name": filename, "peer_jid": peer_jid, "size": file_size, "date": file_date, "hash": file_hash, "desc": file_desc, |
172 "range": range_, "range_offset": range_offset, "range_length": range_length, | 171 "range": range_, "range_offset": range_offset, "range_length": range_length, |
173 "si_id": si_id, "progress_id": si_id, "stream_method": stream_method, "stream_plugin": plugin} | 172 "si_id": si_id, "progress_id": si_id, "stream_method": stream_method, "stream_plugin": plugin} |
174 | 173 |
175 d = self._f.getDestDir(peer_jid, data, data, profile) | 174 d = self._f.getDestDir(client, peer_jid, data, data, stream_object=True) |
176 d.addCallback(self.confirmationCb, iq_elt, data, profile) | 175 d.addCallback(self.confirmationCb, client, iq_elt, data) |
177 | 176 |
178 def _getFileObject(self, dest_path, can_range=False): | 177 def confirmationCb(self, accepted, client, iq_elt, data): |
179 """Open file, put file pointer to the end if the file if needed | |
180 @param dest_path: path of the destination file | |
181 @param can_range: True if the file pointer can be moved | |
182 @return: File Object""" | |
183 return open(dest_path, "ab" if can_range else "wb") | |
184 | |
185 def confirmationCb(self, accepted, iq_elt, data, profile): | |
186 """Called on confirmation answer | 178 """Called on confirmation answer |
187 | 179 |
188 @param accepted(bool): True if file transfer is accepted | 180 @param accepted(bool): True if file transfer is accepted |
189 @param iq_elt(domish.Element): initial SI request | 181 @param iq_elt(domish.Element): initial SI request |
190 @param data(dict): session data | 182 @param data(dict): session data |
191 @param profile: %(doc_profile)s | |
192 """ | 183 """ |
193 if not accepted: | 184 if not accepted: |
194 log.info(u"File transfer declined") | 185 log.info(u"File transfer declined") |
195 self._si.sendError(iq_elt, 'forbidden', profile) | 186 self._si.sendError(client, iq_elt, 'forbidden') |
196 return | 187 return |
197 # data, timeout, stream_method, failed_methods = client._xep_0096_waiting_for_approval[sid] | 188 # data, timeout, stream_method, failed_methods = client._xep_0096_waiting_for_approval[sid] |
198 # can_range = data['can_range'] == "True" | 189 # can_range = data['can_range'] == "True" |
199 # range_offset = 0 | 190 # range_offset = 0 |
200 # if timeout.active(): | 191 # if timeout.active(): |
214 # del client._xep_0096_waiting_for_approval[sid] | 205 # del client._xep_0096_waiting_for_approval[sid] |
215 # return | 206 # return |
216 | 207 |
217 # file_obj = self._getFileObject(dest_path, can_range) | 208 # file_obj = self._getFileObject(dest_path, can_range) |
218 # range_offset = file_obj.tell() | 209 # range_offset = file_obj.tell() |
219 d = data['stream_plugin'].createSession(data['file_obj'], data['peer_jid'], data['si_id'], profile=profile) | 210 d = data['stream_plugin'].createSession(client, data['stream_object'], data['peer_jid'], data['si_id']) |
220 d.addCallback(self._transferCb, data, profile) | 211 d.addCallback(self._transferCb, client, data) |
221 d.addErrback(self._transferEb, data, profile) | 212 d.addErrback(self._transferEb, client, data) |
222 | 213 |
223 #we can send the iq result | 214 #we can send the iq result |
224 feature_elt = self.host.plugins["XEP-0020"].chooseOption({'stream-method': data['stream_method']}, namespace=None) | 215 feature_elt = self.host.plugins["XEP-0020"].chooseOption({'stream-method': data['stream_method']}, namespace=None) |
225 misc_elts = [] | 216 misc_elts = [] |
226 misc_elts.append(domish.Element((SI_PROFILE, "file"))) | 217 misc_elts.append(domish.Element((SI_PROFILE, "file"))) |
227 # if can_range: | 218 # if can_range: |
228 # range_elt = domish.Element((None, "range")) | 219 # range_elt = domish.Element((None, "range")) |
229 # range_elt['offset'] = str(range_offset) | 220 # range_elt['offset'] = str(range_offset) |
230 # #TODO: manage range length | 221 # #TODO: manage range length |
231 # misc_elts.append(range_elt) | 222 # misc_elts.append(range_elt) |
232 self._si.acceptStream(iq_elt, feature_elt, misc_elts, profile) | 223 self._si.acceptStream(client, iq_elt, feature_elt, misc_elts) |
233 | 224 |
234 def _transferCb(self, dummy, data, profile): | 225 def _transferCb(self, dummy, client, data): |
235 """Called by the stream method when transfer successfuly finished | 226 """Called by the stream method when transfer successfuly finished |
236 | 227 |
237 @param data: session data | 228 @param data: session data |
238 @param profile: %(doc_profile)s | |
239 """ | 229 """ |
240 #TODO: check hash | 230 #TODO: check hash |
241 data['file_obj'].close() | 231 data['stream_object'].close() |
242 log.info(u'Transfer {si_id} successfuly finished'.format(**data)) | 232 log.info(u'Transfer {si_id} successfuly finished'.format(**data)) |
243 | 233 |
244 def _transferEb(self, failure, data, profile): | 234 def _transferEb(self, failure, client, data): |
245 """Called when something went wrong with the transfer | 235 """Called when something went wrong with the transfer |
246 | 236 |
247 @param id: stream id | 237 @param id: stream id |
248 @param data: session data | 238 @param data: session data |
249 @param profile: %(doc_profile)s | |
250 """ | 239 """ |
251 log.warning(u'Transfer {si_id} failed: {reason}'.format(reason=unicode(failure.value), **data)) | 240 log.warning(u'Transfer {si_id} failed: {reason}'.format(reason=unicode(failure.value), **data)) |
252 data['file_obj'].close() | 241 data['stream_object'].close() |
253 | 242 |
254 def _sendFile(self, peer_jid_s, filepath, name, desc, profile=C.PROF_KEY_NONE): | 243 def _sendFile(self, peer_jid_s, filepath, name, desc, profile=C.PROF_KEY_NONE): |
255 return self.sendFile(jid.JID(peer_jid_s), filepath, name or None, desc or None, profile) | 244 client = self.host.getClient(profile) |
256 | 245 return self.sendFile(client, jid.JID(peer_jid_s), filepath, name or None, desc or None) |
257 def sendFile(self, peer_jid, filepath, name=None, desc=None, profile=C.PROF_KEY_NONE): | 246 |
247 def sendFile(self, client, peer_jid, filepath, name=None, desc=None): | |
258 """Send a file using XEP-0096 | 248 """Send a file using XEP-0096 |
259 | 249 |
260 @param peer_jid(jid.JID): recipient | 250 @param peer_jid(jid.JID): recipient |
261 @param filepath(str): absolute path to the file to send | 251 @param filepath(str): absolute path to the file to send |
262 @param name(unicode): name of the file to send | 252 @param name(unicode): name of the file to send |
263 name must not contain "/" characters | 253 name must not contain "/" characters |
264 @param desc: description of the file | 254 @param desc: description of the file |
265 @param profile: %(doc_profile)s | 255 @param profile: %(doc_profile)s |
266 @return: an unique id to identify the transfer | 256 @return: an unique id to identify the transfer |
267 """ | 257 """ |
268 client = self.host.getClient(profile) | |
269 feature_elt = self.host.plugins["XEP-0020"].proposeFeatures({'stream-method': self.managed_stream_m}, namespace=None) | 258 feature_elt = self.host.plugins["XEP-0020"].proposeFeatures({'stream-method': self.managed_stream_m}, namespace=None) |
270 | 259 |
271 file_transfer_elts = [] | 260 file_transfer_elts = [] |
272 | 261 |
273 statinfo = os.stat(filepath) | 262 statinfo = os.stat(filepath) |
280 file_elt.addElement('desc', content=desc) | 269 file_elt.addElement('desc', content=desc) |
281 file_transfer_elts.append(file_elt) | 270 file_transfer_elts.append(file_elt) |
282 | 271 |
283 file_transfer_elts.append(domish.Element((None, 'range'))) | 272 file_transfer_elts.append(domish.Element((None, 'range'))) |
284 | 273 |
285 sid, offer_d = self._si.proposeStream(peer_jid, SI_PROFILE, feature_elt, file_transfer_elts, profile=client.profile) | 274 sid, offer_d = self._si.proposeStream(client, peer_jid, SI_PROFILE, feature_elt, file_transfer_elts) |
286 args = [filepath, sid, size, client] | 275 args = [filepath, sid, size, client] |
287 offer_d.addCallbacks(self._fileCb, self._fileEb, args, None, args) | 276 offer_d.addCallbacks(self._fileCb, self._fileEb, args, None, args) |
288 return sid | 277 return sid |
289 | 278 |
290 def _fileCb(self, result_tuple, filepath, sid, size, client): | 279 def _fileCb(self, result_tuple, filepath, sid, size, client): |
316 plugin = self.host.plugins["XEP-0047"] | 305 plugin = self.host.plugins["XEP-0047"] |
317 else: | 306 else: |
318 log.warning(u"Invalid stream method received") | 307 log.warning(u"Invalid stream method received") |
319 return | 308 return |
320 | 309 |
321 file_obj = self._f.File(self.host, | 310 stream_object = stream.FileStreamObject(self.host, |
322 filepath, | 311 client, |
323 uid=sid, | 312 filepath, |
324 size=size, | 313 uid=sid, |
325 profile=client.profile | 314 size=size, |
326 ) | 315 ) |
327 d = plugin.startStream(file_obj, jid.JID(iq_elt['from']), sid, profile=client.profile) | 316 d = plugin.startStream(client, stream_object, jid.JID(iq_elt['from']), sid) |
328 d.addCallback(self._sendCb, sid, file_obj, client.profile) | 317 d.addCallback(self._sendCb, client, sid, stream_object) |
329 d.addErrback(self._sendEb, sid, file_obj, client.profile) | 318 d.addErrback(self._sendEb, client, sid, stream_object) |
330 | 319 |
331 def _fileEb(self, failure, filepath, sid, size, client): | 320 def _fileEb(self, failure, filepath, sid, size, client): |
332 if failure.check(error.StanzaError): | 321 if failure.check(error.StanzaError): |
333 stanza_err = failure.value | 322 stanza_err = failure.value |
334 if stanza_err.code == '403' and stanza_err.condition == 'forbidden': | 323 if stanza_err.code == '403' and stanza_err.condition == 'forbidden': |
345 elif failure.check(exceptions.DataError): | 334 elif failure.check(exceptions.DataError): |
346 log.warning(u'Invalid stanza received') | 335 log.warning(u'Invalid stanza received') |
347 else: | 336 else: |
348 log.error(u'Error while proposing stream: {}'.format(failure)) | 337 log.error(u'Error while proposing stream: {}'.format(failure)) |
349 | 338 |
350 def _sendCb(self, dummy, sid, file_obj, profile): | 339 def _sendCb(self, dummy, client, sid, stream_object): |
351 log.info(_(u'transfer {sid} successfuly finished [{profile}]').format( | 340 log.info(_(u'transfer {sid} successfuly finished [{profile}]').format( |
352 sid=sid, | 341 sid=sid, |
353 profile=profile)) | 342 profile=client.profile)) |
354 file_obj.close() | 343 stream_object.close() |
355 | 344 |
356 def _sendEb(self, failure, sid, file_obj, profile): | 345 def _sendEb(self, failure, client, sid, stream_object): |
357 log.warning(_(u'transfer {sid} failed [{profile}]: {reason}').format( | 346 log.warning(_(u'transfer {sid} failed [{profile}]: {reason}').format( |
358 sid=sid, | 347 sid=sid, |
359 profile=profile, | 348 profile=client.profile, |
360 reason=unicode(failure.value), | 349 reason=unicode(failure.value), |
361 )) | 350 )) |
362 file_obj.close() | 351 stream_object.close() |