comparison src/plugins/plugin_xep_0096.py @ 2489:e2a7bb875957

plugin pipe/stream, file transfert: refactoring and improvments: this is a big patch as things had to be changed at the same time. - changed methods using profile argument to use client instead - move SatFile in a new tools.stream module, has it should be part of core, not a plugin - new IStreamProducer interface, to handler starting a pull producer - new FileStreamObject which create a stream producer/consumer from a SatFile - plugin pipe is no more using unix named pipe, as it complicate the thing, special care need to be taken to not block, and it's generally not necessary. Instead a socket is now used, so the plugin has been renomed to jingle stream. - bad connection/error should be better handler in jingle stream plugin, and code should not block anymore - jp pipe commands have been updated accordingly fix bug 237
author Goffi <goffi@goffi.org>
date Thu, 08 Feb 2018 00:37:42 +0100
parents 0046283a285d
children 7ad5f2c4e34a
comparison
equal deleted inserted replaced
2488:78c7992a26ed 2489:e2a7bb875957
21 from sat.core.constants import Const as C 21 from sat.core.constants import Const as C
22 from sat.core.log import getLogger 22 from sat.core.log import getLogger
23 log = getLogger(__name__) 23 log = getLogger(__name__)
24 from sat.core import exceptions 24 from sat.core import exceptions
25 from sat.tools import xml_tools 25 from sat.tools import xml_tools
26 from sat.tools import stream
26 from twisted.words.xish import domish 27 from twisted.words.xish import domish
27 from twisted.words.protocols.jabber import jid 28 from twisted.words.protocols.jabber import jid
28 from twisted.words.protocols.jabber import error 29 from twisted.words.protocols.jabber import error
29 import os 30 import os
30 31
61 host.bridge.addMethod("siSendFile", ".plugin", in_sign='sssss', out_sign='s', method=self._sendFile) 62 host.bridge.addMethod("siSendFile", ".plugin", in_sign='sssss', out_sign='s', method=self._sendFile)
62 63
63 def unload(self): 64 def unload(self):
64 self._si.unregisterSIProfile(SI_PROFILE_NAME) 65 self._si.unregisterSIProfile(SI_PROFILE_NAME)
65 66
66 def _badRequest(self, iq_elt, message=None, profile=C.PROF_KEY_NONE): 67 def _badRequest(self, client, iq_elt, message=None):
67 """Send a bad-request error 68 """Send a bad-request error
68 69
69 @param iq_elt(domish.Element): initial <IQ> element of the SI request 70 @param iq_elt(domish.Element): initial <IQ> element of the SI request
70 @param message(None, unicode): informational message to display in the logs 71 @param message(None, unicode): informational message to display in the logs
71 @param profile: %(doc_profile)s
72 """ 72 """
73 if message is not None: 73 if message is not None:
74 log.warning(message) 74 log.warning(message)
75 self._si.sendError(iq_elt, 'bad-request', profile) 75 self._si.sendError(client, iq_elt, 'bad-request')
76 76
77 def _parseRange(self, parent_elt, file_size): 77 def _parseRange(self, parent_elt, file_size):
78 """find and parse <range/> element 78 """find and parse <range/> element
79 79
80 @param parent_elt(domish.Element): direct parent of the <range/> element 80 @param parent_elt(domish.Element): direct parent of the <range/> element
105 if range_offset != 0 or range_length != file_size: 105 if range_offset != 0 or range_length != file_size:
106 raise NotImplementedError # FIXME 106 raise NotImplementedError # FIXME
107 107
108 return range_, range_offset, range_length 108 return range_, range_offset, range_length
109 109
110 def _transferRequest(self, iq_elt, si_id, si_mime_type, si_elt, profile): 110 def _transferRequest(self, client, iq_elt, si_id, si_mime_type, si_elt):
111 """Called when a file transfer is requested 111 """Called when a file transfer is requested
112 112
113 @param iq_elt(domish.Element): initial <IQ> element of the SI request 113 @param iq_elt(domish.Element): initial <IQ> element of the SI request
114 @param si_id(unicode): Stream Initiation session id 114 @param si_id(unicode): Stream Initiation session id
115 @param si_mime_type("unicode"): Mime type of the file (or default "application/octet-stream" if unknown) 115 @param si_mime_type("unicode"): Mime type of the file (or default "application/octet-stream" if unknown)
116 @param si_elt(domish.Element): request 116 @param si_elt(domish.Element): request
117 @param profile: %(doc_profile)s
118 """ 117 """
119 log.info(_("XEP-0096 file transfer requested")) 118 log.info(_("XEP-0096 file transfer requested"))
120 peer_jid = jid.JID(iq_elt['from']) 119 peer_jid = jid.JID(iq_elt['from'])
121 120
122 try: 121 try:
123 file_elt = si_elt.elements(NS_SI_FT, "file").next() 122 file_elt = si_elt.elements(NS_SI_FT, "file").next()
124 except StopIteration: 123 except StopIteration:
125 return self._badRequest(iq_elt, "No <file/> element found in SI File Transfer request", profile) 124 return self._badRequest(client, iq_elt, "No <file/> element found in SI File Transfer request")
126 125
127 try: 126 try:
128 feature_elt = self.host.plugins["XEP-0020"].getFeatureElt(si_elt) 127 feature_elt = self.host.plugins["XEP-0020"].getFeatureElt(si_elt)
129 except exceptions.NotFound: 128 except exceptions.NotFound:
130 return self._badRequest(iq_elt, "No <feature/> element found in SI File Transfer request", profile) 129 return self._badRequest(client, iq_elt, "No <feature/> element found in SI File Transfer request")
131 130
132 try: 131 try:
133 filename = file_elt["name"] 132 filename = file_elt["name"]
134 file_size = int(file_elt["size"]) 133 file_size = int(file_elt["size"])
135 except (KeyError, ValueError): 134 except (KeyError, ValueError):
136 return self._badRequest(iq_elt, "Malformed SI File Transfer request", profile) 135 return self._badRequest(client, iq_elt, "Malformed SI File Transfer request")
137 136
138 file_date = file_elt.getAttribute("date") 137 file_date = file_elt.getAttribute("date")
139 file_hash = file_elt.getAttribute("hash") 138 file_hash = file_elt.getAttribute("hash")
140 139
141 log.info(u"File proposed: name=[{name}] size={size}".format(name=filename, size=file_size)) 140 log.info(u"File proposed: name=[{name}] size={size}".format(name=filename, size=file_size))
146 file_desc = '' 145 file_desc = ''
147 146
148 try: 147 try:
149 range_, range_offset, range_length = self._parseRange(file_elt, file_size) 148 range_, range_offset, range_length = self._parseRange(file_elt, file_size)
150 except ValueError: 149 except ValueError:
151 return self._badRequest(iq_elt, "Malformed SI File Transfer request", profile) 150 return self._badRequest(client, iq_elt, "Malformed SI File Transfer request")
152 151
153 try: 152 try:
154 stream_method = self.host.plugins["XEP-0020"].negotiate(feature_elt, 'stream-method', self.managed_stream_m, namespace=None) 153 stream_method = self.host.plugins["XEP-0020"].negotiate(feature_elt, 'stream-method', self.managed_stream_m, namespace=None)
155 except KeyError: 154 except KeyError:
156 return self._badRequest(iq_elt, "No stream method found", profile) 155 return self._badRequest(client, iq_elt, "No stream method found")
157 156
158 if stream_method: 157 if stream_method:
159 if stream_method == self.host.plugins["XEP-0065"].NAMESPACE: 158 if stream_method == self.host.plugins["XEP-0065"].NAMESPACE:
160 plugin = self.host.plugins["XEP-0065"] 159 plugin = self.host.plugins["XEP-0065"]
161 elif stream_method == self.host.plugins["XEP-0047"].NAMESPACE: 160 elif stream_method == self.host.plugins["XEP-0047"].NAMESPACE:
162 plugin = self.host.plugins["XEP-0047"] 161 plugin = self.host.plugins["XEP-0047"]
163 else: 162 else:
164 log.error(u"Unknown stream method, this should not happen at this stage, cancelling transfer") 163 log.error(u"Unknown stream method, this should not happen at this stage, cancelling transfer")
165 else: 164 else:
166 log.warning(u"Can't find a valid stream method") 165 log.warning(u"Can't find a valid stream method")
167 self._si.sendError(iq_elt, 'not-acceptable', profile) 166 self._si.sendError(client, iq_elt, 'not-acceptable')
168 return 167 return
169 168
170 #if we are here, the transfer can start, we just need user's agreement 169 #if we are here, the transfer can start, we just need user's agreement
171 data = {"name": filename, "peer_jid": peer_jid, "size": file_size, "date": file_date, "hash": file_hash, "desc": file_desc, 170 data = {"name": filename, "peer_jid": peer_jid, "size": file_size, "date": file_date, "hash": file_hash, "desc": file_desc,
172 "range": range_, "range_offset": range_offset, "range_length": range_length, 171 "range": range_, "range_offset": range_offset, "range_length": range_length,
173 "si_id": si_id, "progress_id": si_id, "stream_method": stream_method, "stream_plugin": plugin} 172 "si_id": si_id, "progress_id": si_id, "stream_method": stream_method, "stream_plugin": plugin}
174 173
175 d = self._f.getDestDir(peer_jid, data, data, profile) 174 d = self._f.getDestDir(client, peer_jid, data, data, stream_object=True)
176 d.addCallback(self.confirmationCb, iq_elt, data, profile) 175 d.addCallback(self.confirmationCb, client, iq_elt, data)
177 176
178 def _getFileObject(self, dest_path, can_range=False): 177 def confirmationCb(self, accepted, client, iq_elt, data):
179 """Open file, put file pointer to the end if the file if needed
180 @param dest_path: path of the destination file
181 @param can_range: True if the file pointer can be moved
182 @return: File Object"""
183 return open(dest_path, "ab" if can_range else "wb")
184
185 def confirmationCb(self, accepted, iq_elt, data, profile):
186 """Called on confirmation answer 178 """Called on confirmation answer
187 179
188 @param accepted(bool): True if file transfer is accepted 180 @param accepted(bool): True if file transfer is accepted
189 @param iq_elt(domish.Element): initial SI request 181 @param iq_elt(domish.Element): initial SI request
190 @param data(dict): session data 182 @param data(dict): session data
191 @param profile: %(doc_profile)s
192 """ 183 """
193 if not accepted: 184 if not accepted:
194 log.info(u"File transfer declined") 185 log.info(u"File transfer declined")
195 self._si.sendError(iq_elt, 'forbidden', profile) 186 self._si.sendError(client, iq_elt, 'forbidden')
196 return 187 return
197 # data, timeout, stream_method, failed_methods = client._xep_0096_waiting_for_approval[sid] 188 # data, timeout, stream_method, failed_methods = client._xep_0096_waiting_for_approval[sid]
198 # can_range = data['can_range'] == "True" 189 # can_range = data['can_range'] == "True"
199 # range_offset = 0 190 # range_offset = 0
200 # if timeout.active(): 191 # if timeout.active():
214 # del client._xep_0096_waiting_for_approval[sid] 205 # del client._xep_0096_waiting_for_approval[sid]
215 # return 206 # return
216 207
217 # file_obj = self._getFileObject(dest_path, can_range) 208 # file_obj = self._getFileObject(dest_path, can_range)
218 # range_offset = file_obj.tell() 209 # range_offset = file_obj.tell()
219 d = data['stream_plugin'].createSession(data['file_obj'], data['peer_jid'], data['si_id'], profile=profile) 210 d = data['stream_plugin'].createSession(client, data['stream_object'], data['peer_jid'], data['si_id'])
220 d.addCallback(self._transferCb, data, profile) 211 d.addCallback(self._transferCb, client, data)
221 d.addErrback(self._transferEb, data, profile) 212 d.addErrback(self._transferEb, client, data)
222 213
223 #we can send the iq result 214 #we can send the iq result
224 feature_elt = self.host.plugins["XEP-0020"].chooseOption({'stream-method': data['stream_method']}, namespace=None) 215 feature_elt = self.host.plugins["XEP-0020"].chooseOption({'stream-method': data['stream_method']}, namespace=None)
225 misc_elts = [] 216 misc_elts = []
226 misc_elts.append(domish.Element((SI_PROFILE, "file"))) 217 misc_elts.append(domish.Element((SI_PROFILE, "file")))
227 # if can_range: 218 # if can_range:
228 # range_elt = domish.Element((None, "range")) 219 # range_elt = domish.Element((None, "range"))
229 # range_elt['offset'] = str(range_offset) 220 # range_elt['offset'] = str(range_offset)
230 # #TODO: manage range length 221 # #TODO: manage range length
231 # misc_elts.append(range_elt) 222 # misc_elts.append(range_elt)
232 self._si.acceptStream(iq_elt, feature_elt, misc_elts, profile) 223 self._si.acceptStream(client, iq_elt, feature_elt, misc_elts)
233 224
234 def _transferCb(self, dummy, data, profile): 225 def _transferCb(self, dummy, client, data):
235 """Called by the stream method when transfer successfuly finished 226 """Called by the stream method when transfer successfuly finished
236 227
237 @param data: session data 228 @param data: session data
238 @param profile: %(doc_profile)s
239 """ 229 """
240 #TODO: check hash 230 #TODO: check hash
241 data['file_obj'].close() 231 data['stream_object'].close()
242 log.info(u'Transfer {si_id} successfuly finished'.format(**data)) 232 log.info(u'Transfer {si_id} successfuly finished'.format(**data))
243 233
244 def _transferEb(self, failure, data, profile): 234 def _transferEb(self, failure, client, data):
245 """Called when something went wrong with the transfer 235 """Called when something went wrong with the transfer
246 236
247 @param id: stream id 237 @param id: stream id
248 @param data: session data 238 @param data: session data
249 @param profile: %(doc_profile)s
250 """ 239 """
251 log.warning(u'Transfer {si_id} failed: {reason}'.format(reason=unicode(failure.value), **data)) 240 log.warning(u'Transfer {si_id} failed: {reason}'.format(reason=unicode(failure.value), **data))
252 data['file_obj'].close() 241 data['stream_object'].close()
253 242
254 def _sendFile(self, peer_jid_s, filepath, name, desc, profile=C.PROF_KEY_NONE): 243 def _sendFile(self, peer_jid_s, filepath, name, desc, profile=C.PROF_KEY_NONE):
255 return self.sendFile(jid.JID(peer_jid_s), filepath, name or None, desc or None, profile) 244 client = self.host.getClient(profile)
256 245 return self.sendFile(client, jid.JID(peer_jid_s), filepath, name or None, desc or None)
257 def sendFile(self, peer_jid, filepath, name=None, desc=None, profile=C.PROF_KEY_NONE): 246
247 def sendFile(self, client, peer_jid, filepath, name=None, desc=None):
258 """Send a file using XEP-0096 248 """Send a file using XEP-0096
259 249
260 @param peer_jid(jid.JID): recipient 250 @param peer_jid(jid.JID): recipient
261 @param filepath(str): absolute path to the file to send 251 @param filepath(str): absolute path to the file to send
262 @param name(unicode): name of the file to send 252 @param name(unicode): name of the file to send
263 name must not contain "/" characters 253 name must not contain "/" characters
264 @param desc: description of the file 254 @param desc: description of the file
265 @param profile: %(doc_profile)s 255 @param profile: %(doc_profile)s
266 @return: an unique id to identify the transfer 256 @return: an unique id to identify the transfer
267 """ 257 """
268 client = self.host.getClient(profile)
269 feature_elt = self.host.plugins["XEP-0020"].proposeFeatures({'stream-method': self.managed_stream_m}, namespace=None) 258 feature_elt = self.host.plugins["XEP-0020"].proposeFeatures({'stream-method': self.managed_stream_m}, namespace=None)
270 259
271 file_transfer_elts = [] 260 file_transfer_elts = []
272 261
273 statinfo = os.stat(filepath) 262 statinfo = os.stat(filepath)
280 file_elt.addElement('desc', content=desc) 269 file_elt.addElement('desc', content=desc)
281 file_transfer_elts.append(file_elt) 270 file_transfer_elts.append(file_elt)
282 271
283 file_transfer_elts.append(domish.Element((None, 'range'))) 272 file_transfer_elts.append(domish.Element((None, 'range')))
284 273
285 sid, offer_d = self._si.proposeStream(peer_jid, SI_PROFILE, feature_elt, file_transfer_elts, profile=client.profile) 274 sid, offer_d = self._si.proposeStream(client, peer_jid, SI_PROFILE, feature_elt, file_transfer_elts)
286 args = [filepath, sid, size, client] 275 args = [filepath, sid, size, client]
287 offer_d.addCallbacks(self._fileCb, self._fileEb, args, None, args) 276 offer_d.addCallbacks(self._fileCb, self._fileEb, args, None, args)
288 return sid 277 return sid
289 278
290 def _fileCb(self, result_tuple, filepath, sid, size, client): 279 def _fileCb(self, result_tuple, filepath, sid, size, client):
316 plugin = self.host.plugins["XEP-0047"] 305 plugin = self.host.plugins["XEP-0047"]
317 else: 306 else:
318 log.warning(u"Invalid stream method received") 307 log.warning(u"Invalid stream method received")
319 return 308 return
320 309
321 file_obj = self._f.File(self.host, 310 stream_object = stream.FileStreamObject(self.host,
322 filepath, 311 client,
323 uid=sid, 312 filepath,
324 size=size, 313 uid=sid,
325 profile=client.profile 314 size=size,
326 ) 315 )
327 d = plugin.startStream(file_obj, jid.JID(iq_elt['from']), sid, profile=client.profile) 316 d = plugin.startStream(client, stream_object, jid.JID(iq_elt['from']), sid)
328 d.addCallback(self._sendCb, sid, file_obj, client.profile) 317 d.addCallback(self._sendCb, client, sid, stream_object)
329 d.addErrback(self._sendEb, sid, file_obj, client.profile) 318 d.addErrback(self._sendEb, client, sid, stream_object)
330 319
331 def _fileEb(self, failure, filepath, sid, size, client): 320 def _fileEb(self, failure, filepath, sid, size, client):
332 if failure.check(error.StanzaError): 321 if failure.check(error.StanzaError):
333 stanza_err = failure.value 322 stanza_err = failure.value
334 if stanza_err.code == '403' and stanza_err.condition == 'forbidden': 323 if stanza_err.code == '403' and stanza_err.condition == 'forbidden':
345 elif failure.check(exceptions.DataError): 334 elif failure.check(exceptions.DataError):
346 log.warning(u'Invalid stanza received') 335 log.warning(u'Invalid stanza received')
347 else: 336 else:
348 log.error(u'Error while proposing stream: {}'.format(failure)) 337 log.error(u'Error while proposing stream: {}'.format(failure))
349 338
350 def _sendCb(self, dummy, sid, file_obj, profile): 339 def _sendCb(self, dummy, client, sid, stream_object):
351 log.info(_(u'transfer {sid} successfuly finished [{profile}]').format( 340 log.info(_(u'transfer {sid} successfuly finished [{profile}]').format(
352 sid=sid, 341 sid=sid,
353 profile=profile)) 342 profile=client.profile))
354 file_obj.close() 343 stream_object.close()
355 344
356 def _sendEb(self, failure, sid, file_obj, profile): 345 def _sendEb(self, failure, client, sid, stream_object):
357 log.warning(_(u'transfer {sid} failed [{profile}]: {reason}').format( 346 log.warning(_(u'transfer {sid} failed [{profile}]: {reason}').format(
358 sid=sid, 347 sid=sid,
359 profile=profile, 348 profile=client.profile,
360 reason=unicode(failure.value), 349 reason=unicode(failure.value),
361 )) 350 ))
362 file_obj.close() 351 stream_object.close()