Mercurial > libervia-backend
comparison src/plugins/plugin_xep_0260.py @ 2489:e2a7bb875957
plugin pipe/stream, file transfert: refactoring and improvments:
this is a big patch as things had to be changed at the same time.
- changed methods using profile argument to use client instead
- move SatFile in a new tools.stream module, has it should be part of core, not a plugin
- new IStreamProducer interface, to handler starting a pull producer
- new FileStreamObject which create a stream producer/consumer from a SatFile
- plugin pipe is no more using unix named pipe, as it complicate the thing,
special care need to be taken to not block, and it's generally not necessary.
Instead a socket is now used, so the plugin has been renomed to jingle stream.
- bad connection/error should be better handler in jingle stream plugin, and code should not block anymore
- jp pipe commands have been updated accordingly
fix bug 237
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 08 Feb 2018 00:37:42 +0100 |
parents | 0046283a285d |
children | 67cc54b01a12 |
comparison
equal
deleted
inserted
replaced
2488:78c7992a26ed | 2489:e2a7bb875957 |
---|---|
125 candidate_elt['priority'] = unicode(candidate.priority) | 125 candidate_elt['priority'] = unicode(candidate.priority) |
126 candidate_elt['type'] = candidate.type | 126 candidate_elt['type'] = candidate.type |
127 return transport_elt | 127 return transport_elt |
128 | 128 |
129 @defer.inlineCallbacks | 129 @defer.inlineCallbacks |
130 def jingleSessionInit(self, session, content_name, profile): | 130 def jingleSessionInit(self, client, session, content_name): |
131 client = self.host.getClient(profile) | |
132 content_data = session['contents'][content_name] | 131 content_data = session['contents'][content_name] |
133 transport_data = content_data['transport_data'] | 132 transport_data = content_data['transport_data'] |
134 sid = transport_data['sid'] = unicode(uuid.uuid4()) | 133 sid = transport_data['sid'] = unicode(uuid.uuid4()) |
135 session_hash = transport_data['session_hash'] = self._s5b.getSessionHash(client.jid, session['peer_jid'], sid) | 134 session_hash = transport_data['session_hash'] = self._s5b.getSessionHash(client.jid, session['peer_jid'], sid) |
136 transport_data['peer_session_hash'] = self._s5b.getSessionHash(session['peer_jid'], client.jid, sid) # requester and target are inversed for peer candidates | 135 transport_data['peer_session_hash'] = self._s5b.getSessionHash(session['peer_jid'], client.jid, sid) # requester and target are inversed for peer candidates |
137 transport_data['stream_d'] = self._s5b.registerHash(session_hash, None, profile) | 136 transport_data['stream_d'] = self._s5b.registerHash(client, session_hash, None) |
138 candidates = transport_data['candidates'] = yield self._s5b.getCandidates(profile) | 137 candidates = transport_data['candidates'] = yield self._s5b.getCandidates(client) |
139 mode = 'tcp' # XXX: we only manage tcp for now | 138 mode = 'tcp' # XXX: we only manage tcp for now |
140 transport_elt = self._buildCandidates(session, candidates, sid, session_hash, client, mode) | 139 transport_elt = self._buildCandidates(session, candidates, sid, session_hash, client, mode) |
141 | 140 |
142 defer.returnValue(transport_elt) | 141 defer.returnValue(transport_elt) |
143 | 142 |
144 def _proxyActivatedCb(self, iq_result_elt, candidate, session, content_name, profile): | 143 def _proxyActivatedCb(self, iq_result_elt, client, candidate, session, content_name): |
145 """Called when activation confirmation has been received from proxy | 144 """Called when activation confirmation has been received from proxy |
146 | 145 |
147 cf XEP-0260 § 2.4 | 146 cf XEP-0260 § 2.4 |
148 """ | 147 """ |
149 # now that the proxy is activated, we have to inform other peer | 148 # now that the proxy is activated, we have to inform other peer |
150 iq_elt, transport_elt = self._j.buildAction(self._j.A_TRANSPORT_INFO, session, content_name, profile) | 149 iq_elt, transport_elt = self._j.buildAction(client, self._j.A_TRANSPORT_INFO, session, content_name) |
151 activated_elt = transport_elt.addElement('activated') | 150 activated_elt = transport_elt.addElement('activated') |
152 activated_elt['cid'] = candidate.id | 151 activated_elt['cid'] = candidate.id |
153 iq_elt.send() | 152 iq_elt.send() |
154 | 153 |
155 def _proxyActivatedEb(self, stanza_error, candidate, session, content_name, profile): | 154 def _proxyActivatedEb(self, stanza_error, client, candidate, session, content_name): |
156 """Called when activation error has been received from proxy | 155 """Called when activation error has been received from proxy |
157 | 156 |
158 cf XEP-0260 § 2.4 | 157 cf XEP-0260 § 2.4 |
159 """ | 158 """ |
160 # TODO: fallback to IBB | 159 # TODO: fallback to IBB |
161 # now that the proxy is activated, we have to inform other peer | 160 # now that the proxy is activated, we have to inform other peer |
162 iq_elt, transport_elt = self._j.buildAction(self._j.A_TRANSPORT_INFO, session, content_name, profile) | 161 iq_elt, transport_elt = self._j.buildAction(client, self._j.A_TRANSPORT_INFO, session, content_name) |
163 transport_elt.addElement('proxy-error') | 162 transport_elt.addElement('proxy-error') |
164 iq_elt.send() | 163 iq_elt.send() |
165 log.warning(u"Can't activate proxy, we need to fallback to IBB: {reason}" | 164 log.warning(u"Can't activate proxy, we need to fallback to IBB: {reason}" |
166 .format(reason = stanza_error.value.condition)) | 165 .format(reason = stanza_error.value.condition)) |
167 client = self.host.getClient(profile) | |
168 self.doFallback(session, content_name, client) | 166 self.doFallback(session, content_name, client) |
169 | 167 |
170 def _foundPeerCandidate(self, candidate, session, transport_data, content_name, client): | 168 def _foundPeerCandidate(self, candidate, session, transport_data, content_name, client): |
171 """Called when the best candidate from other peer is found | 169 """Called when the best candidate from other peer is found |
172 | 170 |
183 for c in transport_data['peer_candidates']: | 181 for c in transport_data['peer_candidates']: |
184 if c is None or c is candidate: | 182 if c is None or c is candidate: |
185 continue | 183 continue |
186 c.discard() | 184 c.discard() |
187 del transport_data['peer_candidates'] | 185 del transport_data['peer_candidates'] |
188 iq_elt, transport_elt = self._j.buildAction(self._j.A_TRANSPORT_INFO, session, content_name, client.profile) | 186 iq_elt, transport_elt = self._j.buildAction(client, self._j.A_TRANSPORT_INFO, session, content_name) |
189 if candidate is None: | 187 if candidate is None: |
190 log.warning(u"Can't connect to any peer candidate") | 188 log.warning(u"Can't connect to any peer candidate") |
191 candidate_elt = transport_elt.addElement('candidate-error') | 189 candidate_elt = transport_elt.addElement('candidate-error') |
192 else: | 190 else: |
193 log.info(u"Found best peer candidate: {}".format(unicode(candidate))) | 191 log.info(u"Found best peer candidate: {}".format(unicode(candidate))) |
252 candidate = choosed_candidate)) | 250 candidate = choosed_candidate)) |
253 del transport_data['best_candidate'] | 251 del transport_data['best_candidate'] |
254 del transport_data['peer_best_candidate'] | 252 del transport_data['peer_best_candidate'] |
255 | 253 |
256 if choosed_candidate.type == self._s5b.TYPE_PROXY: | 254 if choosed_candidate.type == self._s5b.TYPE_PROXY: |
257 # the file transfer need to wait for proxy activation | 255 # the stream transfer need to wait for proxy activation |
258 # (see XEP-0260 § 2.4) | 256 # (see XEP-0260 § 2.4) |
259 if our_candidate: | 257 if our_candidate: |
260 d = self._s5b.connectCandidate(choosed_candidate, transport_data['session_hash'], profile=client.profile) | 258 d = self._s5b.connectCandidate(client, choosed_candidate, transport_data['session_hash']) |
261 d.addCallback(lambda dummy: choosed_candidate.activate(transport_data['sid'], session['peer_jid'], client)) | 259 d.addCallback(lambda dummy: choosed_candidate.activate(transport_data['sid'], session['peer_jid'], client)) |
262 args = [choosed_candidate, session, content_name, client.profile] | 260 args = [client, choosed_candidate, session, content_name] |
263 d.addCallbacks(self._proxyActivatedCb, self._proxyActivatedEb, args, None, args) | 261 d.addCallbacks(self._proxyActivatedCb, self._proxyActivatedEb, args, None, args) |
264 else: | 262 else: |
265 # this Deferred will be called when we'll receive activation confirmation from other peer | 263 # this Deferred will be called when we'll receive activation confirmation from other peer |
266 d = transport_data['activation_d'] = defer.Deferred() | 264 d = transport_data['activation_d'] = defer.Deferred() |
267 else: | 265 else: |
268 d = defer.succeed(None) | 266 d = defer.succeed(None) |
269 | 267 |
270 if content_data['senders'] == session['role']: | 268 if content_data['senders'] == session['role']: |
271 # we can now start the file transfer (or start it after proxy activation) | 269 # we can now start the stream transfer (or start it after proxy activation) |
272 d.addCallback(lambda dummy: choosed_candidate.startTransfer(transport_data['session_hash'])) | 270 d.addCallback(lambda dummy: choosed_candidate.startTransfer(transport_data['session_hash'])) |
273 d.addErrback(self._startEb, session, content_name, client) | 271 d.addErrback(self._startEb, session, content_name, client) |
274 | 272 |
275 def _startEb(self, fail, session, content_name, client): | 273 def _startEb(self, fail, session, content_name, client): |
276 """Called when it's not possible to start the transfer | 274 """Called when it's not possible to start the transfer |
340 activation_d.callback(None) | 338 activation_d.callback(None) |
341 else: | 339 else: |
342 activation_d.errback(ProxyError()) | 340 activation_d.errback(ProxyError()) |
343 | 341 |
344 @defer.inlineCallbacks | 342 @defer.inlineCallbacks |
345 def jingleHandler(self, action, session, content_name, transport_elt, profile): | 343 def jingleHandler(self, client, action, session, content_name, transport_elt): |
346 client = self.host.getClient(profile) | |
347 content_data = session['contents'][content_name] | 344 content_data = session['contents'][content_name] |
348 transport_data = content_data['transport_data'] | 345 transport_data = content_data['transport_data'] |
349 | 346 |
350 if action in (self._j.A_ACCEPTED_ACK, self._j.A_PREPARE_RESPONDER): | 347 if action in (self._j.A_ACCEPTED_ACK, self._j.A_PREPARE_RESPONDER): |
351 pass | 348 pass |
356 transport_data['peer_candidates'] = self._parseCandidates(transport_elt) | 353 transport_data['peer_candidates'] = self._parseCandidates(transport_elt) |
357 | 354 |
358 elif action == self._j.A_START: | 355 elif action == self._j.A_START: |
359 session_hash = transport_data['session_hash'] | 356 session_hash = transport_data['session_hash'] |
360 peer_candidates = transport_data['peer_candidates'] | 357 peer_candidates = transport_data['peer_candidates'] |
361 file_obj = content_data['file_obj'] | 358 stream_object = content_data['stream_object'] |
362 self._s5b.associateFileObj(session_hash, file_obj, profile) | 359 self._s5b.associateStreamObject(client, session_hash, stream_object) |
363 stream_d = transport_data.pop('stream_d') | 360 stream_d = transport_data.pop('stream_d') |
364 stream_d.chainDeferred(content_data['finished_d']) | 361 stream_d.chainDeferred(content_data['finished_d']) |
365 peer_session_hash = transport_data['peer_session_hash'] | 362 peer_session_hash = transport_data['peer_session_hash'] |
366 d = self._s5b.getBestCandidate(peer_candidates, session_hash, peer_session_hash, profile) | 363 d = self._s5b.getBestCandidate(client, peer_candidates, session_hash, peer_session_hash) |
367 d.addCallback(self._foundPeerCandidate, session, transport_data, content_name, client) | 364 d.addCallback(self._foundPeerCandidate, session, transport_data, content_name, client) |
368 | 365 |
369 elif action == self._j.A_SESSION_INITIATE: | 366 elif action == self._j.A_SESSION_INITIATE: |
370 # responder side, we select a candidate in the ones sent by initiator | 367 # responder side, we select a candidate in the ones sent by initiator |
371 # and we give our candidates | 368 # and we give our candidates |
372 assert 'peer_candidates' not in transport_data | 369 assert 'peer_candidates' not in transport_data |
373 sid = transport_data['sid'] = transport_elt['sid'] | 370 sid = transport_data['sid'] = transport_elt['sid'] |
374 session_hash = transport_data['session_hash'] = self._s5b.getSessionHash(client.jid, session['peer_jid'], sid) | 371 session_hash = transport_data['session_hash'] = self._s5b.getSessionHash(client.jid, session['peer_jid'], sid) |
375 peer_session_hash = transport_data['peer_session_hash'] = self._s5b.getSessionHash(session['peer_jid'], client.jid, sid) # requester and target are inversed for peer candidates | 372 peer_session_hash = transport_data['peer_session_hash'] = self._s5b.getSessionHash(session['peer_jid'], client.jid, sid) # requester and target are inversed for peer candidates |
376 peer_candidates = transport_data['peer_candidates'] = self._parseCandidates(transport_elt) | 373 peer_candidates = transport_data['peer_candidates'] = self._parseCandidates(transport_elt) |
377 file_obj = content_data['file_obj'] | 374 stream_object = content_data['stream_object'] |
378 stream_d = self._s5b.registerHash(session_hash, file_obj, profile) | 375 stream_d = self._s5b.registerHash(client, session_hash, stream_object) |
379 stream_d.chainDeferred(content_data['finished_d']) | 376 stream_d.chainDeferred(content_data['finished_d']) |
380 d = self._s5b.getBestCandidate(peer_candidates, session_hash, peer_session_hash, profile) | 377 d = self._s5b.getBestCandidate(client, peer_candidates, session_hash, peer_session_hash) |
381 d.addCallback(self._foundPeerCandidate, session, transport_data, content_name, client) | 378 d.addCallback(self._foundPeerCandidate, session, transport_data, content_name, client) |
382 candidates = yield self._s5b.getCandidates(profile) | 379 candidates = yield self._s5b.getCandidates(client) |
383 # we remove duplicate candidates | 380 # we remove duplicate candidates |
384 candidates = [candidate for candidate in candidates if candidate not in peer_candidates] | 381 candidates = [candidate for candidate in candidates if candidate not in peer_candidates] |
385 | 382 |
386 transport_data['candidates'] = candidates | 383 transport_data['candidates'] = candidates |
387 # we can now build a new <transport> element with our candidates | 384 # we can now build a new <transport> element with our candidates |
411 else: | 408 else: |
412 log.warning(u"FIXME: unmanaged action {}".format(action)) | 409 log.warning(u"FIXME: unmanaged action {}".format(action)) |
413 | 410 |
414 defer.returnValue(transport_elt) | 411 defer.returnValue(transport_elt) |
415 | 412 |
416 def jingleTerminate(self, action, session, content_name, reason_elt, profile): | 413 def jingleTerminate(self, client, action, session, content_name, reason_elt): |
417 if reason_elt.decline: | 414 if reason_elt.decline: |
418 log.debug(u"Session declined, deleting S5B session") | 415 log.debug(u"Session declined, deleting S5B session") |
419 # we just need to clean the S5B session if it is declined | 416 # we just need to clean the S5B session if it is declined |
420 client = self.host.getClient(profile) | |
421 content_data = session['contents'][content_name] | 417 content_data = session['contents'][content_name] |
422 transport_data = content_data['transport_data'] | 418 transport_data = content_data['transport_data'] |
423 self._s5b.killSession(None, transport_data['session_hash'], None, client) | 419 self._s5b.killSession(None, transport_data['session_hash'], None, client) |
424 | 420 |
425 def _doFallback(self, feature_checked, session, content_name, client): | 421 def _doFallback(self, feature_checked, session, content_name, client): |
427 | 423 |
428 @param feature_checked(bool): True if other peer can do IBB | 424 @param feature_checked(bool): True if other peer can do IBB |
429 """ | 425 """ |
430 if not feature_checked: | 426 if not feature_checked: |
431 log.warning(u"Other peer can't manage jingle IBB, be have to terminate the session") | 427 log.warning(u"Other peer can't manage jingle IBB, be have to terminate the session") |
432 self._j.terminate(self._j.REASON_CONNECTIVITY_ERROR, session, client.profile) | 428 self._j.terminate(client, self._j.REASON_CONNECTIVITY_ERROR, session) |
433 else: | 429 else: |
434 self._j.transportReplace(self._jingle_ibb.NAMESPACE, session, content_name, client.profile) | 430 self._j.transportReplace(client, self._jingle_ibb.NAMESPACE, session, content_name) |
435 | 431 |
436 def doFallback(self, session, content_name, client): | 432 def doFallback(self, session, content_name, client): |
437 """Fallback to IBB transport, used in last resort | 433 """Fallback to IBB transport, used in last resort |
438 | 434 |
439 @param session(dict): session data | 435 @param session(dict): session data |
443 if session['role'] != self._j.ROLE_INITIATOR: | 439 if session['role'] != self._j.ROLE_INITIATOR: |
444 # only initiator must do the fallback, see XEP-0260 §3 | 440 # only initiator must do the fallback, see XEP-0260 §3 |
445 return | 441 return |
446 if self._jingle_ibb is None: | 442 if self._jingle_ibb is None: |
447 log.warning(u"Jingle IBB (XEP-0261) plugin is not available, we have to close the session") | 443 log.warning(u"Jingle IBB (XEP-0261) plugin is not available, we have to close the session") |
448 self._j.terminate(self._j.REASON_CONNECTIVITY_ERROR, session, client.profile) | 444 self._j.terminate(client, self._j.REASON_CONNECTIVITY_ERROR, session) |
449 else: | 445 else: |
450 d = self.host.hasFeature(client, self._jingle_ibb.NAMESPACE, session['peer_jid']) | 446 d = self.host.hasFeature(client, self._jingle_ibb.NAMESPACE, session['peer_jid']) |
451 d.addCallback(self._doFallback, session, content_name, client) | 447 d.addCallback(self._doFallback, session, content_name, client) |
452 return d | 448 return d |
453 | 449 |