diff src/plugins/plugin_xep_0260.py @ 2489:e2a7bb875957

plugin pipe/stream, file transfert: refactoring and improvments: this is a big patch as things had to be changed at the same time. - changed methods using profile argument to use client instead - move SatFile in a new tools.stream module, has it should be part of core, not a plugin - new IStreamProducer interface, to handler starting a pull producer - new FileStreamObject which create a stream producer/consumer from a SatFile - plugin pipe is no more using unix named pipe, as it complicate the thing, special care need to be taken to not block, and it's generally not necessary. Instead a socket is now used, so the plugin has been renomed to jingle stream. - bad connection/error should be better handler in jingle stream plugin, and code should not block anymore - jp pipe commands have been updated accordingly fix bug 237
author Goffi <goffi@goffi.org>
date Thu, 08 Feb 2018 00:37:42 +0100
parents 0046283a285d
children 67cc54b01a12
line wrap: on
line diff
--- a/src/plugins/plugin_xep_0260.py	Thu Feb 01 07:24:34 2018 +0100
+++ b/src/plugins/plugin_xep_0260.py	Thu Feb 08 00:37:42 2018 +0100
@@ -127,44 +127,42 @@
         return transport_elt
 
     @defer.inlineCallbacks
-    def jingleSessionInit(self, session, content_name, profile):
-        client = self.host.getClient(profile)
+    def jingleSessionInit(self, client, session, content_name):
         content_data = session['contents'][content_name]
         transport_data = content_data['transport_data']
         sid = transport_data['sid'] = unicode(uuid.uuid4())
         session_hash = transport_data['session_hash'] = self._s5b.getSessionHash(client.jid, session['peer_jid'], sid)
         transport_data['peer_session_hash'] = self._s5b.getSessionHash(session['peer_jid'], client.jid, sid) # requester and target are inversed for peer candidates
-        transport_data['stream_d'] = self._s5b.registerHash(session_hash, None, profile)
-        candidates = transport_data['candidates'] = yield self._s5b.getCandidates(profile)
+        transport_data['stream_d'] = self._s5b.registerHash(client, session_hash, None)
+        candidates = transport_data['candidates'] = yield self._s5b.getCandidates(client)
         mode = 'tcp' # XXX: we only manage tcp for now
         transport_elt = self._buildCandidates(session, candidates, sid, session_hash, client, mode)
 
         defer.returnValue(transport_elt)
 
-    def _proxyActivatedCb(self, iq_result_elt, candidate, session, content_name, profile):
+    def _proxyActivatedCb(self, iq_result_elt, client, candidate, session, content_name):
         """Called when activation confirmation has been received from proxy
 
         cf XEP-0260 § 2.4
         """
         # now that the proxy is activated, we have to inform other peer
-        iq_elt, transport_elt = self._j.buildAction(self._j.A_TRANSPORT_INFO, session, content_name, profile)
+        iq_elt, transport_elt = self._j.buildAction(client, self._j.A_TRANSPORT_INFO, session, content_name)
         activated_elt = transport_elt.addElement('activated')
         activated_elt['cid'] = candidate.id
         iq_elt.send()
 
-    def _proxyActivatedEb(self, stanza_error, candidate, session, content_name, profile):
+    def _proxyActivatedEb(self, stanza_error, client, candidate, session, content_name):
         """Called when activation error has been received from proxy
 
         cf XEP-0260 § 2.4
         """
         # TODO: fallback to IBB
         # now that the proxy is activated, we have to inform other peer
-        iq_elt, transport_elt = self._j.buildAction(self._j.A_TRANSPORT_INFO, session, content_name, profile)
+        iq_elt, transport_elt = self._j.buildAction(client, self._j.A_TRANSPORT_INFO, session, content_name)
         transport_elt.addElement('proxy-error')
         iq_elt.send()
         log.warning(u"Can't activate proxy, we need to fallback to IBB: {reason}"
             .format(reason = stanza_error.value.condition))
-        client = self.host.getClient(profile)
         self.doFallback(session, content_name, client)
 
     def _foundPeerCandidate(self, candidate, session, transport_data, content_name, client):
@@ -185,7 +183,7 @@
                 continue
             c.discard()
         del transport_data['peer_candidates']
-        iq_elt, transport_elt = self._j.buildAction(self._j.A_TRANSPORT_INFO, session, content_name, client.profile)
+        iq_elt, transport_elt = self._j.buildAction(client, self._j.A_TRANSPORT_INFO, session, content_name)
         if candidate is None:
             log.warning(u"Can't connect to any peer candidate")
             candidate_elt = transport_elt.addElement('candidate-error')
@@ -254,12 +252,12 @@
             del transport_data['peer_best_candidate']
 
             if choosed_candidate.type == self._s5b.TYPE_PROXY:
-                # the file transfer need to wait for proxy activation
+                # the stream transfer need to wait for proxy activation
                 # (see XEP-0260 § 2.4)
                 if our_candidate:
-                    d = self._s5b.connectCandidate(choosed_candidate, transport_data['session_hash'], profile=client.profile)
+                    d = self._s5b.connectCandidate(client, choosed_candidate, transport_data['session_hash'])
                     d.addCallback(lambda dummy: choosed_candidate.activate(transport_data['sid'], session['peer_jid'], client))
-                    args = [choosed_candidate, session, content_name, client.profile]
+                    args = [client, choosed_candidate, session, content_name]
                     d.addCallbacks(self._proxyActivatedCb, self._proxyActivatedEb, args, None, args)
                 else:
                     # this Deferred will be called when we'll receive activation confirmation from other peer
@@ -268,7 +266,7 @@
                 d = defer.succeed(None)
 
             if content_data['senders'] == session['role']:
-                # we can now start the file transfer (or start it after proxy activation)
+                # we can now start the stream transfer (or start it after proxy activation)
                 d.addCallback(lambda dummy: choosed_candidate.startTransfer(transport_data['session_hash']))
                 d.addErrback(self._startEb, session, content_name, client)
 
@@ -342,8 +340,7 @@
             activation_d.errback(ProxyError())
 
     @defer.inlineCallbacks
-    def jingleHandler(self, action, session, content_name, transport_elt, profile):
-        client = self.host.getClient(profile)
+    def jingleHandler(self, client, action, session, content_name, transport_elt):
         content_data = session['contents'][content_name]
         transport_data = content_data['transport_data']
 
@@ -358,12 +355,12 @@
         elif action == self._j.A_START:
             session_hash = transport_data['session_hash']
             peer_candidates = transport_data['peer_candidates']
-            file_obj = content_data['file_obj']
-            self._s5b.associateFileObj(session_hash, file_obj, profile)
+            stream_object = content_data['stream_object']
+            self._s5b.associateStreamObject(client, session_hash, stream_object)
             stream_d = transport_data.pop('stream_d')
             stream_d.chainDeferred(content_data['finished_d'])
             peer_session_hash = transport_data['peer_session_hash']
-            d = self._s5b.getBestCandidate(peer_candidates, session_hash, peer_session_hash, profile)
+            d = self._s5b.getBestCandidate(client, peer_candidates, session_hash, peer_session_hash)
             d.addCallback(self._foundPeerCandidate, session, transport_data, content_name, client)
 
         elif action == self._j.A_SESSION_INITIATE:
@@ -374,12 +371,12 @@
             session_hash = transport_data['session_hash'] = self._s5b.getSessionHash(client.jid, session['peer_jid'], sid)
             peer_session_hash = transport_data['peer_session_hash'] = self._s5b.getSessionHash(session['peer_jid'], client.jid, sid) # requester and target are inversed for peer candidates
             peer_candidates = transport_data['peer_candidates'] = self._parseCandidates(transport_elt)
-            file_obj = content_data['file_obj']
-            stream_d = self._s5b.registerHash(session_hash, file_obj, profile)
+            stream_object = content_data['stream_object']
+            stream_d = self._s5b.registerHash(client, session_hash, stream_object)
             stream_d.chainDeferred(content_data['finished_d'])
-            d = self._s5b.getBestCandidate(peer_candidates, session_hash, peer_session_hash, profile)
+            d = self._s5b.getBestCandidate(client, peer_candidates, session_hash, peer_session_hash)
             d.addCallback(self._foundPeerCandidate, session, transport_data, content_name, client)
-            candidates = yield self._s5b.getCandidates(profile)
+            candidates = yield self._s5b.getCandidates(client)
             # we remove duplicate candidates
             candidates = [candidate for candidate in candidates if candidate not in peer_candidates]
 
@@ -413,11 +410,10 @@
 
         defer.returnValue(transport_elt)
 
-    def jingleTerminate(self, action, session, content_name, reason_elt, profile):
+    def jingleTerminate(self, client, action, session, content_name, reason_elt):
         if reason_elt.decline:
             log.debug(u"Session declined, deleting S5B session")
             # we just need to clean the S5B session if it is declined
-            client = self.host.getClient(profile)
             content_data = session['contents'][content_name]
             transport_data = content_data['transport_data']
             self._s5b.killSession(None, transport_data['session_hash'], None, client)
@@ -429,9 +425,9 @@
          """
         if not feature_checked:
             log.warning(u"Other peer can't manage jingle IBB, be have to terminate the session")
-            self._j.terminate(self._j.REASON_CONNECTIVITY_ERROR, session, client.profile)
+            self._j.terminate(client, self._j.REASON_CONNECTIVITY_ERROR, session)
         else:
-            self._j.transportReplace(self._jingle_ibb.NAMESPACE, session, content_name, client.profile)
+            self._j.transportReplace(client, self._jingle_ibb.NAMESPACE, session, content_name)
 
     def doFallback(self, session, content_name, client):
         """Fallback to IBB transport, used in last resort
@@ -445,7 +441,7 @@
             return
         if self._jingle_ibb is None:
             log.warning(u"Jingle IBB (XEP-0261) plugin is not available, we have to close the session")
-            self._j.terminate(self._j.REASON_CONNECTIVITY_ERROR, session, client.profile)
+            self._j.terminate(client, self._j.REASON_CONNECTIVITY_ERROR, session)
         else:
             d = self.host.hasFeature(client, self._jingle_ibb.NAMESPACE, session['peer_jid'])
             d.addCallback(self._doFallback, session, content_name, client)