comparison src/plugins/plugin_xep_0260.py @ 2489:e2a7bb875957

plugin pipe/stream, file transfert: refactoring and improvments: this is a big patch as things had to be changed at the same time. - changed methods using profile argument to use client instead - move SatFile in a new tools.stream module, has it should be part of core, not a plugin - new IStreamProducer interface, to handler starting a pull producer - new FileStreamObject which create a stream producer/consumer from a SatFile - plugin pipe is no more using unix named pipe, as it complicate the thing, special care need to be taken to not block, and it's generally not necessary. Instead a socket is now used, so the plugin has been renomed to jingle stream. - bad connection/error should be better handler in jingle stream plugin, and code should not block anymore - jp pipe commands have been updated accordingly fix bug 237
author Goffi <goffi@goffi.org>
date Thu, 08 Feb 2018 00:37:42 +0100
parents 0046283a285d
children 67cc54b01a12
comparison
equal deleted inserted replaced
2488:78c7992a26ed 2489:e2a7bb875957
125 candidate_elt['priority'] = unicode(candidate.priority) 125 candidate_elt['priority'] = unicode(candidate.priority)
126 candidate_elt['type'] = candidate.type 126 candidate_elt['type'] = candidate.type
127 return transport_elt 127 return transport_elt
128 128
129 @defer.inlineCallbacks 129 @defer.inlineCallbacks
130 def jingleSessionInit(self, session, content_name, profile): 130 def jingleSessionInit(self, client, session, content_name):
131 client = self.host.getClient(profile)
132 content_data = session['contents'][content_name] 131 content_data = session['contents'][content_name]
133 transport_data = content_data['transport_data'] 132 transport_data = content_data['transport_data']
134 sid = transport_data['sid'] = unicode(uuid.uuid4()) 133 sid = transport_data['sid'] = unicode(uuid.uuid4())
135 session_hash = transport_data['session_hash'] = self._s5b.getSessionHash(client.jid, session['peer_jid'], sid) 134 session_hash = transport_data['session_hash'] = self._s5b.getSessionHash(client.jid, session['peer_jid'], sid)
136 transport_data['peer_session_hash'] = self._s5b.getSessionHash(session['peer_jid'], client.jid, sid) # requester and target are inversed for peer candidates 135 transport_data['peer_session_hash'] = self._s5b.getSessionHash(session['peer_jid'], client.jid, sid) # requester and target are inversed for peer candidates
137 transport_data['stream_d'] = self._s5b.registerHash(session_hash, None, profile) 136 transport_data['stream_d'] = self._s5b.registerHash(client, session_hash, None)
138 candidates = transport_data['candidates'] = yield self._s5b.getCandidates(profile) 137 candidates = transport_data['candidates'] = yield self._s5b.getCandidates(client)
139 mode = 'tcp' # XXX: we only manage tcp for now 138 mode = 'tcp' # XXX: we only manage tcp for now
140 transport_elt = self._buildCandidates(session, candidates, sid, session_hash, client, mode) 139 transport_elt = self._buildCandidates(session, candidates, sid, session_hash, client, mode)
141 140
142 defer.returnValue(transport_elt) 141 defer.returnValue(transport_elt)
143 142
144 def _proxyActivatedCb(self, iq_result_elt, candidate, session, content_name, profile): 143 def _proxyActivatedCb(self, iq_result_elt, client, candidate, session, content_name):
145 """Called when activation confirmation has been received from proxy 144 """Called when activation confirmation has been received from proxy
146 145
147 cf XEP-0260 § 2.4 146 cf XEP-0260 § 2.4
148 """ 147 """
149 # now that the proxy is activated, we have to inform other peer 148 # now that the proxy is activated, we have to inform other peer
150 iq_elt, transport_elt = self._j.buildAction(self._j.A_TRANSPORT_INFO, session, content_name, profile) 149 iq_elt, transport_elt = self._j.buildAction(client, self._j.A_TRANSPORT_INFO, session, content_name)
151 activated_elt = transport_elt.addElement('activated') 150 activated_elt = transport_elt.addElement('activated')
152 activated_elt['cid'] = candidate.id 151 activated_elt['cid'] = candidate.id
153 iq_elt.send() 152 iq_elt.send()
154 153
155 def _proxyActivatedEb(self, stanza_error, candidate, session, content_name, profile): 154 def _proxyActivatedEb(self, stanza_error, client, candidate, session, content_name):
156 """Called when activation error has been received from proxy 155 """Called when activation error has been received from proxy
157 156
158 cf XEP-0260 § 2.4 157 cf XEP-0260 § 2.4
159 """ 158 """
160 # TODO: fallback to IBB 159 # TODO: fallback to IBB
161 # now that the proxy is activated, we have to inform other peer 160 # now that the proxy is activated, we have to inform other peer
162 iq_elt, transport_elt = self._j.buildAction(self._j.A_TRANSPORT_INFO, session, content_name, profile) 161 iq_elt, transport_elt = self._j.buildAction(client, self._j.A_TRANSPORT_INFO, session, content_name)
163 transport_elt.addElement('proxy-error') 162 transport_elt.addElement('proxy-error')
164 iq_elt.send() 163 iq_elt.send()
165 log.warning(u"Can't activate proxy, we need to fallback to IBB: {reason}" 164 log.warning(u"Can't activate proxy, we need to fallback to IBB: {reason}"
166 .format(reason = stanza_error.value.condition)) 165 .format(reason = stanza_error.value.condition))
167 client = self.host.getClient(profile)
168 self.doFallback(session, content_name, client) 166 self.doFallback(session, content_name, client)
169 167
170 def _foundPeerCandidate(self, candidate, session, transport_data, content_name, client): 168 def _foundPeerCandidate(self, candidate, session, transport_data, content_name, client):
171 """Called when the best candidate from other peer is found 169 """Called when the best candidate from other peer is found
172 170
183 for c in transport_data['peer_candidates']: 181 for c in transport_data['peer_candidates']:
184 if c is None or c is candidate: 182 if c is None or c is candidate:
185 continue 183 continue
186 c.discard() 184 c.discard()
187 del transport_data['peer_candidates'] 185 del transport_data['peer_candidates']
188 iq_elt, transport_elt = self._j.buildAction(self._j.A_TRANSPORT_INFO, session, content_name, client.profile) 186 iq_elt, transport_elt = self._j.buildAction(client, self._j.A_TRANSPORT_INFO, session, content_name)
189 if candidate is None: 187 if candidate is None:
190 log.warning(u"Can't connect to any peer candidate") 188 log.warning(u"Can't connect to any peer candidate")
191 candidate_elt = transport_elt.addElement('candidate-error') 189 candidate_elt = transport_elt.addElement('candidate-error')
192 else: 190 else:
193 log.info(u"Found best peer candidate: {}".format(unicode(candidate))) 191 log.info(u"Found best peer candidate: {}".format(unicode(candidate)))
252 candidate = choosed_candidate)) 250 candidate = choosed_candidate))
253 del transport_data['best_candidate'] 251 del transport_data['best_candidate']
254 del transport_data['peer_best_candidate'] 252 del transport_data['peer_best_candidate']
255 253
256 if choosed_candidate.type == self._s5b.TYPE_PROXY: 254 if choosed_candidate.type == self._s5b.TYPE_PROXY:
257 # the file transfer need to wait for proxy activation 255 # the stream transfer need to wait for proxy activation
258 # (see XEP-0260 § 2.4) 256 # (see XEP-0260 § 2.4)
259 if our_candidate: 257 if our_candidate:
260 d = self._s5b.connectCandidate(choosed_candidate, transport_data['session_hash'], profile=client.profile) 258 d = self._s5b.connectCandidate(client, choosed_candidate, transport_data['session_hash'])
261 d.addCallback(lambda dummy: choosed_candidate.activate(transport_data['sid'], session['peer_jid'], client)) 259 d.addCallback(lambda dummy: choosed_candidate.activate(transport_data['sid'], session['peer_jid'], client))
262 args = [choosed_candidate, session, content_name, client.profile] 260 args = [client, choosed_candidate, session, content_name]
263 d.addCallbacks(self._proxyActivatedCb, self._proxyActivatedEb, args, None, args) 261 d.addCallbacks(self._proxyActivatedCb, self._proxyActivatedEb, args, None, args)
264 else: 262 else:
265 # this Deferred will be called when we'll receive activation confirmation from other peer 263 # this Deferred will be called when we'll receive activation confirmation from other peer
266 d = transport_data['activation_d'] = defer.Deferred() 264 d = transport_data['activation_d'] = defer.Deferred()
267 else: 265 else:
268 d = defer.succeed(None) 266 d = defer.succeed(None)
269 267
270 if content_data['senders'] == session['role']: 268 if content_data['senders'] == session['role']:
271 # we can now start the file transfer (or start it after proxy activation) 269 # we can now start the stream transfer (or start it after proxy activation)
272 d.addCallback(lambda dummy: choosed_candidate.startTransfer(transport_data['session_hash'])) 270 d.addCallback(lambda dummy: choosed_candidate.startTransfer(transport_data['session_hash']))
273 d.addErrback(self._startEb, session, content_name, client) 271 d.addErrback(self._startEb, session, content_name, client)
274 272
275 def _startEb(self, fail, session, content_name, client): 273 def _startEb(self, fail, session, content_name, client):
276 """Called when it's not possible to start the transfer 274 """Called when it's not possible to start the transfer
340 activation_d.callback(None) 338 activation_d.callback(None)
341 else: 339 else:
342 activation_d.errback(ProxyError()) 340 activation_d.errback(ProxyError())
343 341
344 @defer.inlineCallbacks 342 @defer.inlineCallbacks
345 def jingleHandler(self, action, session, content_name, transport_elt, profile): 343 def jingleHandler(self, client, action, session, content_name, transport_elt):
346 client = self.host.getClient(profile)
347 content_data = session['contents'][content_name] 344 content_data = session['contents'][content_name]
348 transport_data = content_data['transport_data'] 345 transport_data = content_data['transport_data']
349 346
350 if action in (self._j.A_ACCEPTED_ACK, self._j.A_PREPARE_RESPONDER): 347 if action in (self._j.A_ACCEPTED_ACK, self._j.A_PREPARE_RESPONDER):
351 pass 348 pass
356 transport_data['peer_candidates'] = self._parseCandidates(transport_elt) 353 transport_data['peer_candidates'] = self._parseCandidates(transport_elt)
357 354
358 elif action == self._j.A_START: 355 elif action == self._j.A_START:
359 session_hash = transport_data['session_hash'] 356 session_hash = transport_data['session_hash']
360 peer_candidates = transport_data['peer_candidates'] 357 peer_candidates = transport_data['peer_candidates']
361 file_obj = content_data['file_obj'] 358 stream_object = content_data['stream_object']
362 self._s5b.associateFileObj(session_hash, file_obj, profile) 359 self._s5b.associateStreamObject(client, session_hash, stream_object)
363 stream_d = transport_data.pop('stream_d') 360 stream_d = transport_data.pop('stream_d')
364 stream_d.chainDeferred(content_data['finished_d']) 361 stream_d.chainDeferred(content_data['finished_d'])
365 peer_session_hash = transport_data['peer_session_hash'] 362 peer_session_hash = transport_data['peer_session_hash']
366 d = self._s5b.getBestCandidate(peer_candidates, session_hash, peer_session_hash, profile) 363 d = self._s5b.getBestCandidate(client, peer_candidates, session_hash, peer_session_hash)
367 d.addCallback(self._foundPeerCandidate, session, transport_data, content_name, client) 364 d.addCallback(self._foundPeerCandidate, session, transport_data, content_name, client)
368 365
369 elif action == self._j.A_SESSION_INITIATE: 366 elif action == self._j.A_SESSION_INITIATE:
370 # responder side, we select a candidate in the ones sent by initiator 367 # responder side, we select a candidate in the ones sent by initiator
371 # and we give our candidates 368 # and we give our candidates
372 assert 'peer_candidates' not in transport_data 369 assert 'peer_candidates' not in transport_data
373 sid = transport_data['sid'] = transport_elt['sid'] 370 sid = transport_data['sid'] = transport_elt['sid']
374 session_hash = transport_data['session_hash'] = self._s5b.getSessionHash(client.jid, session['peer_jid'], sid) 371 session_hash = transport_data['session_hash'] = self._s5b.getSessionHash(client.jid, session['peer_jid'], sid)
375 peer_session_hash = transport_data['peer_session_hash'] = self._s5b.getSessionHash(session['peer_jid'], client.jid, sid) # requester and target are inversed for peer candidates 372 peer_session_hash = transport_data['peer_session_hash'] = self._s5b.getSessionHash(session['peer_jid'], client.jid, sid) # requester and target are inversed for peer candidates
376 peer_candidates = transport_data['peer_candidates'] = self._parseCandidates(transport_elt) 373 peer_candidates = transport_data['peer_candidates'] = self._parseCandidates(transport_elt)
377 file_obj = content_data['file_obj'] 374 stream_object = content_data['stream_object']
378 stream_d = self._s5b.registerHash(session_hash, file_obj, profile) 375 stream_d = self._s5b.registerHash(client, session_hash, stream_object)
379 stream_d.chainDeferred(content_data['finished_d']) 376 stream_d.chainDeferred(content_data['finished_d'])
380 d = self._s5b.getBestCandidate(peer_candidates, session_hash, peer_session_hash, profile) 377 d = self._s5b.getBestCandidate(client, peer_candidates, session_hash, peer_session_hash)
381 d.addCallback(self._foundPeerCandidate, session, transport_data, content_name, client) 378 d.addCallback(self._foundPeerCandidate, session, transport_data, content_name, client)
382 candidates = yield self._s5b.getCandidates(profile) 379 candidates = yield self._s5b.getCandidates(client)
383 # we remove duplicate candidates 380 # we remove duplicate candidates
384 candidates = [candidate for candidate in candidates if candidate not in peer_candidates] 381 candidates = [candidate for candidate in candidates if candidate not in peer_candidates]
385 382
386 transport_data['candidates'] = candidates 383 transport_data['candidates'] = candidates
387 # we can now build a new <transport> element with our candidates 384 # we can now build a new <transport> element with our candidates
411 else: 408 else:
412 log.warning(u"FIXME: unmanaged action {}".format(action)) 409 log.warning(u"FIXME: unmanaged action {}".format(action))
413 410
414 defer.returnValue(transport_elt) 411 defer.returnValue(transport_elt)
415 412
416 def jingleTerminate(self, action, session, content_name, reason_elt, profile): 413 def jingleTerminate(self, client, action, session, content_name, reason_elt):
417 if reason_elt.decline: 414 if reason_elt.decline:
418 log.debug(u"Session declined, deleting S5B session") 415 log.debug(u"Session declined, deleting S5B session")
419 # we just need to clean the S5B session if it is declined 416 # we just need to clean the S5B session if it is declined
420 client = self.host.getClient(profile)
421 content_data = session['contents'][content_name] 417 content_data = session['contents'][content_name]
422 transport_data = content_data['transport_data'] 418 transport_data = content_data['transport_data']
423 self._s5b.killSession(None, transport_data['session_hash'], None, client) 419 self._s5b.killSession(None, transport_data['session_hash'], None, client)
424 420
425 def _doFallback(self, feature_checked, session, content_name, client): 421 def _doFallback(self, feature_checked, session, content_name, client):
427 423
428 @param feature_checked(bool): True if other peer can do IBB 424 @param feature_checked(bool): True if other peer can do IBB
429 """ 425 """
430 if not feature_checked: 426 if not feature_checked:
431 log.warning(u"Other peer can't manage jingle IBB, be have to terminate the session") 427 log.warning(u"Other peer can't manage jingle IBB, be have to terminate the session")
432 self._j.terminate(self._j.REASON_CONNECTIVITY_ERROR, session, client.profile) 428 self._j.terminate(client, self._j.REASON_CONNECTIVITY_ERROR, session)
433 else: 429 else:
434 self._j.transportReplace(self._jingle_ibb.NAMESPACE, session, content_name, client.profile) 430 self._j.transportReplace(client, self._jingle_ibb.NAMESPACE, session, content_name)
435 431
436 def doFallback(self, session, content_name, client): 432 def doFallback(self, session, content_name, client):
437 """Fallback to IBB transport, used in last resort 433 """Fallback to IBB transport, used in last resort
438 434
439 @param session(dict): session data 435 @param session(dict): session data
443 if session['role'] != self._j.ROLE_INITIATOR: 439 if session['role'] != self._j.ROLE_INITIATOR:
444 # only initiator must do the fallback, see XEP-0260 §3 440 # only initiator must do the fallback, see XEP-0260 §3
445 return 441 return
446 if self._jingle_ibb is None: 442 if self._jingle_ibb is None:
447 log.warning(u"Jingle IBB (XEP-0261) plugin is not available, we have to close the session") 443 log.warning(u"Jingle IBB (XEP-0261) plugin is not available, we have to close the session")
448 self._j.terminate(self._j.REASON_CONNECTIVITY_ERROR, session, client.profile) 444 self._j.terminate(client, self._j.REASON_CONNECTIVITY_ERROR, session)
449 else: 445 else:
450 d = self.host.hasFeature(client, self._jingle_ibb.NAMESPACE, session['peer_jid']) 446 d = self.host.hasFeature(client, self._jingle_ibb.NAMESPACE, session['peer_jid'])
451 d.addCallback(self._doFallback, session, content_name, client) 447 d.addCallback(self._doFallback, session, content_name, client)
452 return d 448 return d
453 449