Mercurial > libervia-backend
diff src/plugins/plugin_xep_0260.py @ 2489:e2a7bb875957
plugin pipe/stream, file transfert: refactoring and improvments:
this is a big patch as things had to be changed at the same time.
- changed methods using profile argument to use client instead
- move SatFile in a new tools.stream module, has it should be part of core, not a plugin
- new IStreamProducer interface, to handler starting a pull producer
- new FileStreamObject which create a stream producer/consumer from a SatFile
- plugin pipe is no more using unix named pipe, as it complicate the thing,
special care need to be taken to not block, and it's generally not necessary.
Instead a socket is now used, so the plugin has been renomed to jingle stream.
- bad connection/error should be better handler in jingle stream plugin, and code should not block anymore
- jp pipe commands have been updated accordingly
fix bug 237
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 08 Feb 2018 00:37:42 +0100 |
parents | 0046283a285d |
children | 67cc54b01a12 |
line wrap: on
line diff
--- a/src/plugins/plugin_xep_0260.py Thu Feb 01 07:24:34 2018 +0100 +++ b/src/plugins/plugin_xep_0260.py Thu Feb 08 00:37:42 2018 +0100 @@ -127,44 +127,42 @@ return transport_elt @defer.inlineCallbacks - def jingleSessionInit(self, session, content_name, profile): - client = self.host.getClient(profile) + def jingleSessionInit(self, client, session, content_name): content_data = session['contents'][content_name] transport_data = content_data['transport_data'] sid = transport_data['sid'] = unicode(uuid.uuid4()) session_hash = transport_data['session_hash'] = self._s5b.getSessionHash(client.jid, session['peer_jid'], sid) transport_data['peer_session_hash'] = self._s5b.getSessionHash(session['peer_jid'], client.jid, sid) # requester and target are inversed for peer candidates - transport_data['stream_d'] = self._s5b.registerHash(session_hash, None, profile) - candidates = transport_data['candidates'] = yield self._s5b.getCandidates(profile) + transport_data['stream_d'] = self._s5b.registerHash(client, session_hash, None) + candidates = transport_data['candidates'] = yield self._s5b.getCandidates(client) mode = 'tcp' # XXX: we only manage tcp for now transport_elt = self._buildCandidates(session, candidates, sid, session_hash, client, mode) defer.returnValue(transport_elt) - def _proxyActivatedCb(self, iq_result_elt, candidate, session, content_name, profile): + def _proxyActivatedCb(self, iq_result_elt, client, candidate, session, content_name): """Called when activation confirmation has been received from proxy cf XEP-0260 § 2.4 """ # now that the proxy is activated, we have to inform other peer - iq_elt, transport_elt = self._j.buildAction(self._j.A_TRANSPORT_INFO, session, content_name, profile) + iq_elt, transport_elt = self._j.buildAction(client, self._j.A_TRANSPORT_INFO, session, content_name) activated_elt = transport_elt.addElement('activated') activated_elt['cid'] = candidate.id iq_elt.send() - def _proxyActivatedEb(self, stanza_error, candidate, session, content_name, profile): + def _proxyActivatedEb(self, stanza_error, client, candidate, session, content_name): """Called when activation error has been received from proxy cf XEP-0260 § 2.4 """ # TODO: fallback to IBB # now that the proxy is activated, we have to inform other peer - iq_elt, transport_elt = self._j.buildAction(self._j.A_TRANSPORT_INFO, session, content_name, profile) + iq_elt, transport_elt = self._j.buildAction(client, self._j.A_TRANSPORT_INFO, session, content_name) transport_elt.addElement('proxy-error') iq_elt.send() log.warning(u"Can't activate proxy, we need to fallback to IBB: {reason}" .format(reason = stanza_error.value.condition)) - client = self.host.getClient(profile) self.doFallback(session, content_name, client) def _foundPeerCandidate(self, candidate, session, transport_data, content_name, client): @@ -185,7 +183,7 @@ continue c.discard() del transport_data['peer_candidates'] - iq_elt, transport_elt = self._j.buildAction(self._j.A_TRANSPORT_INFO, session, content_name, client.profile) + iq_elt, transport_elt = self._j.buildAction(client, self._j.A_TRANSPORT_INFO, session, content_name) if candidate is None: log.warning(u"Can't connect to any peer candidate") candidate_elt = transport_elt.addElement('candidate-error') @@ -254,12 +252,12 @@ del transport_data['peer_best_candidate'] if choosed_candidate.type == self._s5b.TYPE_PROXY: - # the file transfer need to wait for proxy activation + # the stream transfer need to wait for proxy activation # (see XEP-0260 § 2.4) if our_candidate: - d = self._s5b.connectCandidate(choosed_candidate, transport_data['session_hash'], profile=client.profile) + d = self._s5b.connectCandidate(client, choosed_candidate, transport_data['session_hash']) d.addCallback(lambda dummy: choosed_candidate.activate(transport_data['sid'], session['peer_jid'], client)) - args = [choosed_candidate, session, content_name, client.profile] + args = [client, choosed_candidate, session, content_name] d.addCallbacks(self._proxyActivatedCb, self._proxyActivatedEb, args, None, args) else: # this Deferred will be called when we'll receive activation confirmation from other peer @@ -268,7 +266,7 @@ d = defer.succeed(None) if content_data['senders'] == session['role']: - # we can now start the file transfer (or start it after proxy activation) + # we can now start the stream transfer (or start it after proxy activation) d.addCallback(lambda dummy: choosed_candidate.startTransfer(transport_data['session_hash'])) d.addErrback(self._startEb, session, content_name, client) @@ -342,8 +340,7 @@ activation_d.errback(ProxyError()) @defer.inlineCallbacks - def jingleHandler(self, action, session, content_name, transport_elt, profile): - client = self.host.getClient(profile) + def jingleHandler(self, client, action, session, content_name, transport_elt): content_data = session['contents'][content_name] transport_data = content_data['transport_data'] @@ -358,12 +355,12 @@ elif action == self._j.A_START: session_hash = transport_data['session_hash'] peer_candidates = transport_data['peer_candidates'] - file_obj = content_data['file_obj'] - self._s5b.associateFileObj(session_hash, file_obj, profile) + stream_object = content_data['stream_object'] + self._s5b.associateStreamObject(client, session_hash, stream_object) stream_d = transport_data.pop('stream_d') stream_d.chainDeferred(content_data['finished_d']) peer_session_hash = transport_data['peer_session_hash'] - d = self._s5b.getBestCandidate(peer_candidates, session_hash, peer_session_hash, profile) + d = self._s5b.getBestCandidate(client, peer_candidates, session_hash, peer_session_hash) d.addCallback(self._foundPeerCandidate, session, transport_data, content_name, client) elif action == self._j.A_SESSION_INITIATE: @@ -374,12 +371,12 @@ session_hash = transport_data['session_hash'] = self._s5b.getSessionHash(client.jid, session['peer_jid'], sid) peer_session_hash = transport_data['peer_session_hash'] = self._s5b.getSessionHash(session['peer_jid'], client.jid, sid) # requester and target are inversed for peer candidates peer_candidates = transport_data['peer_candidates'] = self._parseCandidates(transport_elt) - file_obj = content_data['file_obj'] - stream_d = self._s5b.registerHash(session_hash, file_obj, profile) + stream_object = content_data['stream_object'] + stream_d = self._s5b.registerHash(client, session_hash, stream_object) stream_d.chainDeferred(content_data['finished_d']) - d = self._s5b.getBestCandidate(peer_candidates, session_hash, peer_session_hash, profile) + d = self._s5b.getBestCandidate(client, peer_candidates, session_hash, peer_session_hash) d.addCallback(self._foundPeerCandidate, session, transport_data, content_name, client) - candidates = yield self._s5b.getCandidates(profile) + candidates = yield self._s5b.getCandidates(client) # we remove duplicate candidates candidates = [candidate for candidate in candidates if candidate not in peer_candidates] @@ -413,11 +410,10 @@ defer.returnValue(transport_elt) - def jingleTerminate(self, action, session, content_name, reason_elt, profile): + def jingleTerminate(self, client, action, session, content_name, reason_elt): if reason_elt.decline: log.debug(u"Session declined, deleting S5B session") # we just need to clean the S5B session if it is declined - client = self.host.getClient(profile) content_data = session['contents'][content_name] transport_data = content_data['transport_data'] self._s5b.killSession(None, transport_data['session_hash'], None, client) @@ -429,9 +425,9 @@ """ if not feature_checked: log.warning(u"Other peer can't manage jingle IBB, be have to terminate the session") - self._j.terminate(self._j.REASON_CONNECTIVITY_ERROR, session, client.profile) + self._j.terminate(client, self._j.REASON_CONNECTIVITY_ERROR, session) else: - self._j.transportReplace(self._jingle_ibb.NAMESPACE, session, content_name, client.profile) + self._j.transportReplace(client, self._jingle_ibb.NAMESPACE, session, content_name) def doFallback(self, session, content_name, client): """Fallback to IBB transport, used in last resort @@ -445,7 +441,7 @@ return if self._jingle_ibb is None: log.warning(u"Jingle IBB (XEP-0261) plugin is not available, we have to close the session") - self._j.terminate(self._j.REASON_CONNECTIVITY_ERROR, session, client.profile) + self._j.terminate(client, self._j.REASON_CONNECTIVITY_ERROR, session) else: d = self.host.hasFeature(client, self._jingle_ibb.NAMESPACE, session['peer_jid']) d.addCallback(self._doFallback, session, content_name, client)