changeset 1758:a66d34353f34

plugin XEP-0260, XEP-0065: fixed session hash handling: with XEP-0260, we need to manage 2 hashes (one with SHA1(SID + Initiator JID + Responder JID) and the other with SHA1(SID + Responder JID + Initiator JID)). This was not handled correclty, resulting in transfer failure in several cases.
author Goffi <goffi@goffi.org>
date Thu, 17 Dec 2015 22:37:59 +0100
parents abd6d6f89006
children 81923b3f8b14
files src/plugins/plugin_xep_0065.py src/plugins/plugin_xep_0260.py
diffstat 2 files changed, 43 insertions(+), 47 deletions(-) [+]
line wrap: on
line diff
--- a/src/plugins/plugin_xep_0065.py	Thu Dec 17 22:37:58 2015 +0100
+++ b/src/plugins/plugin_xep_0065.py	Thu Dec 17 22:37:59 2015 +0100
@@ -283,15 +283,15 @@
         self.factory.startTransfer(session_hash, chunk_size=chunk_size)
 
 
-def getSessionHash(from_jid, to_jid, sid):
+def getSessionHash(requester_jid, target_jid, sid):
     """Calculate SHA1 Hash according to XEP-0065 §5.3.2
 
-    @param from_jid(jid.JID): jid of the requester
-    @param to_jid(jid.JID): jid of the target
+    @param requester_jid(jid.JID): jid of the requester (the one which activate the proxy)
+    @param target_jid(jid.JID): jid of the target
     @param sid(unicode): session id
     @return (str): hash
     """
-    return hashlib.sha1((sid + from_jid.full() + to_jid.full()).encode('utf-8')).hexdigest()
+    return hashlib.sha1((sid + requester_jid.full() + target_jid.full()).encode('utf-8')).hexdigest()
 
 
 class SOCKSv5(protocol.Protocol, FileSender):
@@ -617,14 +617,15 @@
 class Socks5ClientFactory(protocol.ClientFactory):
     protocol = SOCKSv5
 
-    def __init__(self, parent, session_hash, profile):
+    def __init__(self, parent, session, session_hash, profile):
         """Init the Client Factory
 
-        @param session_hash(unicode): hash of the session
-            hash is the same as hostname computer in XEP-0065 § 5.3.2 #1
+        @param session(dict): session data
+        @param session_hash(unicode): hash used for peer_connection
+            hash is the same as hostname computed in XEP-0065 § 5.3.2 #1
         @param profile(unciode): %(doc_profile)s
         """
-        self.session = parent.getSession(session_hash, profile)
+        self.session = session
         self.session_hash = session_hash
         self.profile = profile
         self.connection = defer.Deferred()
@@ -862,18 +863,28 @@
         candidate.factory.connector = connector
         return candidate.factory.connection
 
-    def connectCandidate(self, candidate, session_hash, delay=None, profile=C.PROF_KEY_NONE):
+    def connectCandidate(self, candidate, session_hash, peer_session_hash=None, delay=None, profile=C.PROF_KEY_NONE):
         """Connect to a candidate
 
         Connection will be done with a Socks5ClientFactory
         @param candidate(Candidate): candidate to connect to
         @param session_hash(unicode): hash of the session
-            hash is the same as hostname computer in XEP-0065 § 5.3.2 #1
+            hash is the same as hostname computed in XEP-0065 § 5.3.2 #1
+        @param peer_session_hash(unicode, None): hash used with the peer
+            None to use session_hash.
+            None must be used in 2 cases:
+                - when XEP-0065 is used with XEP-0096
+                - when a peer connect to a proxy *he proposed himself*
+            in practice, peer_session_hash is only used by tryCandidates
         @param delay(None, float): optional delay to wait before connection, in seconds
         @param profile: %(doc_profile)s
         @return (D): Deferred launched when TCP connection + Socks5 connection is done
         """
-        factory = Socks5ClientFactory(self, session_hash, profile)
+        if peer_session_hash is None:
+            # for XEP-0065, only one hash is needed
+            peer_session_hash = session_hash
+        session = self.getSession(session_hash, profile)
+        factory = Socks5ClientFactory(self, session, peer_session_hash, profile)
         candidate.factory = factory
         if delay is None:
             d = defer.succeed(candidate.host)
@@ -883,14 +894,14 @@
         d.addCallback(self._addConnector, candidate)
         return d
 
-    def tryCandidates(self, candidates, session_hash, connection_cb=None, connection_eb=None, profile=C.PROF_KEY_NONE):
+    def tryCandidates(self, candidates, session_hash, peer_session_hash, connection_cb=None, connection_eb=None, profile=C.PROF_KEY_NONE):
         defers_list = []
 
         for candidate in candidates:
             delay = CANDIDATE_DELAY * len(defers_list)
             if candidate.type == XEP_0065.TYPE_PROXY:
                 delay += CANDIDATE_DELAY_PROXY
-            d = self.connectCandidate(candidate, session_hash, delay, profile)
+            d = self.connectCandidate(candidate, session_hash, peer_session_hash, delay, profile)
             if connection_cb is not None:
                 d.addCallback(lambda dummy, candidate=candidate, profile=profile: connection_cb(candidate, profile))
             if connection_eb is not None:
@@ -899,12 +910,14 @@
 
         return defers_list
 
-    def getBestCandidate(self, candidates, session_hash, profile=C.PROF_KEY_NONE):
+    def getBestCandidate(self, candidates, session_hash, peer_session_hash=None, profile=C.PROF_KEY_NONE):
         """Get best candidate (according to priority) which can connect
 
         @param candidates(iterable[Candidate]): candidates to test
         @param session_hash(unicode): hash of the session
-            hash is the same as hostname computer in XEP-0065 § 5.3.2 #1
+            hash is the same as hostname computed in XEP-0065 § 5.3.2 #1
+        @param peer_session_hash(unicode, None): hash of the other peer
+            only useful for XEP-0260, must be None for XEP-0065 streamhost candidates
         @param profile: %(doc_profile)s
         @return (D(None, Candidate)): best candidate or None if none can connect
         """
@@ -934,7 +947,7 @@
             good_candidates = [c for c in candidates if c]
             return good_candidates[0] if good_candidates else None
 
-        defer_candidates = self.tryCandidates(candidates, session_hash, connectionCb, connectionEb, profile)
+        defer_candidates = self.tryCandidates(candidates, session_hash, peer_session_hash, connectionCb, connectionEb, profile)
         d_list = defer.DeferredList(defer_candidates)
         d_list.addCallback(allTested)
         return d_list
@@ -966,12 +979,8 @@
             ))
 
         try:
-            # XXX: we need to be sure that hash is removed from self.hash_profiles_map
-            #      ONLY if it's the profile requesting the session killing
-            #      otherwise, this will result in a missing hash when the 2 peers
-            #      are on the same instance
-            if self.hash_profiles_map[session_hash] == client.profile:
-                del self.hash_profiles_map[session_hash]
+            assert self.hash_profiles_map[session_hash] == client.profile
+            del self.hash_profiles_map[session_hash]
         except KeyError:
             pass
 
@@ -1121,7 +1130,7 @@
         """Return session data
 
         @param session_hash(unicode): hash of the session
-            hash is the same as hostname computer in XEP-0065 § 5.3.2 #1
+            hash is the same as hostname computed in XEP-0065 § 5.3.2 #1
         @param profile(None, unicode): profile of the peer
             None is used only if profile is unknown (this is only the case
             for incoming request received by Socks5ServerFactory). None must
@@ -1165,23 +1174,8 @@
         if file_obj is not None:
             session_data['file'] = file_obj
 
-        if session_hash in self.hash_profiles_map:
-            # The only case when 2 profiles want to register the same hash
-            # is when they are on the same instance
-            log.info(u"Both Socks5 peers are on the same instance")
-            # XXX:If both peers are on the same instance, they'll register the same
-            #     session_hash, so we'll have 2 profiles for the same hash. The first
-            #     one will be the responder (and so the second one the initiator).
-            #     As we'll keep the initiator choosed candidate (see XEP-0260 § 2.4 #4),
-            #     responder will handle the Socks5 server. Only the server will use
-            #     self.hash_profiles_map to get the profile, so we can ignore the second
-            #     one (the initiator profile).
-            #     There is no easy way to known if the incoming connection
-            #     to the Socks5Server is from initiator or responder, so this seams a
-            #     reasonable workaround.
-            #     NOTE: this workaround is only used with XEP-0260
-        else:
-            self.hash_profiles_map[session_hash] = profile
+        assert session_hash not in self.hash_profiles_map
+        self.hash_profiles_map[session_hash] = profile
 
         return session_data
 
@@ -1234,7 +1228,7 @@
         for candidate in candidates:
             log.info(u"Candidate proposed: {}".format(candidate))
 
-        d = self.getBestCandidate(candidates, session_data['hash'], profile)
+        d = self.getBestCandidate(candidates, session_data['hash'], profile=profile)
         d.addCallback(self._ackStream, iq_elt, session_data, client)
 
     def _ackStream(self, candidate, iq_elt, session_data, client):
--- a/src/plugins/plugin_xep_0260.py	Thu Dec 17 22:37:58 2015 +0100
+++ b/src/plugins/plugin_xep_0260.py	Thu Dec 17 22:37:59 2015 +0100
@@ -132,6 +132,7 @@
         transport_data = content_data['transport_data']
         sid = transport_data['sid'] = unicode(uuid.uuid4())
         session_hash = transport_data['session_hash'] = self._s5b.getSessionHash(client.jid, session['peer_jid'], sid)
+        transport_data['peer_session_hash'] = self._s5b.getSessionHash(session['peer_jid'], client.jid, sid) # requester and target are inversed for peer candidates
         transport_data['stream_d'] = self._s5b.registerHash(session_hash, None, profile)
         candidates = transport_data['candidates'] = yield self._s5b.getCandidates(profile)
         mode = 'tcp' # XXX: we only manage tcp for now
@@ -221,7 +222,7 @@
         else:
             if best_candidate.priority == peer_best_candidate.priority:
                 # same priority, we choose initiator one according to XEP-0260 §2.4 #4
-                log.debug(u"Candidates have same priority, we choose the initiator one")
+                log.debug(u"Candidates have same priority, we select the one choosed by initiator")
                 if session['initiator'] == client.jid:
                     choosed_candidate = best_candidate
                 else:
@@ -353,7 +354,6 @@
             assert 'peer_candidates' not in transport_data
             transport_data['peer_candidates'] = self._parseCandidates(transport_elt)
 
-        # elif action == self._j.A_START:
         elif action == self._j.A_START:
             session_hash = transport_data['session_hash']
             peer_candidates = transport_data['peer_candidates']
@@ -361,7 +361,8 @@
             self._s5b.associateFileObj(session_hash, file_obj, profile)
             stream_d = transport_data.pop('stream_d')
             stream_d.chainDeferred(content_data['finished_d'])
-            d = self._s5b.getBestCandidate(peer_candidates, session_hash, profile)
+            peer_session_hash = transport_data['peer_session_hash']
+            d = self._s5b.getBestCandidate(peer_candidates, session_hash, peer_session_hash, profile)
             d.addCallback(self._foundPeerCandidate, session, transport_data, content_name, client)
 
         elif action == self._j.A_SESSION_INITIATE:
@@ -369,12 +370,13 @@
             # and we give our candidates
             assert 'peer_candidates' not in transport_data
             sid = transport_data['sid'] = transport_elt['sid']
-            session_hash = transport_data['session_hash'] = self._s5b.getSessionHash(session['peer_jid'], client.jid, sid)
+            session_hash = transport_data['session_hash'] = self._s5b.getSessionHash(client.jid, session['peer_jid'], sid)
+            peer_session_hash = transport_data['peer_session_hash'] = self._s5b.getSessionHash(session['peer_jid'], client.jid, sid) # requester and target are inversed for peer candidates
             peer_candidates = transport_data['peer_candidates'] = self._parseCandidates(transport_elt)
             file_obj = content_data['file_obj']
             stream_d = self._s5b.registerHash(session_hash, file_obj, profile)
             stream_d.chainDeferred(content_data['finished_d'])
-            d = self._s5b.getBestCandidate(peer_candidates, session_hash, profile)
+            d = self._s5b.getBestCandidate(peer_candidates, session_hash, peer_session_hash, profile)
             d.addCallback(self._foundPeerCandidate, session, transport_data, content_name, client)
             candidates = yield self._s5b.getCandidates(profile)
             # we remove duplicate candidates
@@ -402,7 +404,7 @@
             if candidate_elt is None:
                 log.warning(u"Unexpected transport element: {}".format(transport_elt.toXml()))
         elif action == self._j.A_DESTROY:
-            # the transport is remplaced (fallback ?). We need mainly to stop kill XEP-0065 session
+            # the transport is replaced (fallback ?), We need mainly to kill XEP-0065 session.
             # note that sid argument is not necessary for sessions created by this plugin
             self._s5b.killSession(None, transport_data['session_hash'], None, client)
         else: