Mercurial > libervia-backend
comparison src/plugins/plugin_xep_0065.py @ 1758:a66d34353f34
plugin XEP-0260, XEP-0065: fixed session hash handling:
with XEP-0260, we need to manage 2 hashes (one with SHA1(SID + Initiator JID + Responder JID) and the other with SHA1(SID + Responder JID + Initiator JID)). This was not handled correclty, resulting in transfer failure in several cases.
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 17 Dec 2015 22:37:59 +0100 |
parents | abd6d6f89006 |
children | 81923b3f8b14 |
comparison
equal
deleted
inserted
replaced
1757:abd6d6f89006 | 1758:a66d34353f34 |
---|---|
281 else: | 281 else: |
282 chunk_size = None | 282 chunk_size = None |
283 self.factory.startTransfer(session_hash, chunk_size=chunk_size) | 283 self.factory.startTransfer(session_hash, chunk_size=chunk_size) |
284 | 284 |
285 | 285 |
286 def getSessionHash(from_jid, to_jid, sid): | 286 def getSessionHash(requester_jid, target_jid, sid): |
287 """Calculate SHA1 Hash according to XEP-0065 §5.3.2 | 287 """Calculate SHA1 Hash according to XEP-0065 §5.3.2 |
288 | 288 |
289 @param from_jid(jid.JID): jid of the requester | 289 @param requester_jid(jid.JID): jid of the requester (the one which activate the proxy) |
290 @param to_jid(jid.JID): jid of the target | 290 @param target_jid(jid.JID): jid of the target |
291 @param sid(unicode): session id | 291 @param sid(unicode): session id |
292 @return (str): hash | 292 @return (str): hash |
293 """ | 293 """ |
294 return hashlib.sha1((sid + from_jid.full() + to_jid.full()).encode('utf-8')).hexdigest() | 294 return hashlib.sha1((sid + requester_jid.full() + target_jid.full()).encode('utf-8')).hexdigest() |
295 | 295 |
296 | 296 |
297 class SOCKSv5(protocol.Protocol, FileSender): | 297 class SOCKSv5(protocol.Protocol, FileSender): |
298 CHUNK_SIZE = 2**16 | 298 CHUNK_SIZE = 2**16 |
299 | 299 |
615 | 615 |
616 | 616 |
617 class Socks5ClientFactory(protocol.ClientFactory): | 617 class Socks5ClientFactory(protocol.ClientFactory): |
618 protocol = SOCKSv5 | 618 protocol = SOCKSv5 |
619 | 619 |
620 def __init__(self, parent, session_hash, profile): | 620 def __init__(self, parent, session, session_hash, profile): |
621 """Init the Client Factory | 621 """Init the Client Factory |
622 | 622 |
623 @param session_hash(unicode): hash of the session | 623 @param session(dict): session data |
624 hash is the same as hostname computer in XEP-0065 § 5.3.2 #1 | 624 @param session_hash(unicode): hash used for peer_connection |
625 hash is the same as hostname computed in XEP-0065 § 5.3.2 #1 | |
625 @param profile(unciode): %(doc_profile)s | 626 @param profile(unciode): %(doc_profile)s |
626 """ | 627 """ |
627 self.session = parent.getSession(session_hash, profile) | 628 self.session = session |
628 self.session_hash = session_hash | 629 self.session_hash = session_hash |
629 self.profile = profile | 630 self.profile = profile |
630 self.connection = defer.Deferred() | 631 self.connection = defer.Deferred() |
631 self._protocol_instance = None | 632 self._protocol_instance = None |
632 self.connector = None | 633 self.connector = None |
860 @return (D): Deferred fired when factory connection is done or has failed | 861 @return (D): Deferred fired when factory connection is done or has failed |
861 """ | 862 """ |
862 candidate.factory.connector = connector | 863 candidate.factory.connector = connector |
863 return candidate.factory.connection | 864 return candidate.factory.connection |
864 | 865 |
865 def connectCandidate(self, candidate, session_hash, delay=None, profile=C.PROF_KEY_NONE): | 866 def connectCandidate(self, candidate, session_hash, peer_session_hash=None, delay=None, profile=C.PROF_KEY_NONE): |
866 """Connect to a candidate | 867 """Connect to a candidate |
867 | 868 |
868 Connection will be done with a Socks5ClientFactory | 869 Connection will be done with a Socks5ClientFactory |
869 @param candidate(Candidate): candidate to connect to | 870 @param candidate(Candidate): candidate to connect to |
870 @param session_hash(unicode): hash of the session | 871 @param session_hash(unicode): hash of the session |
871 hash is the same as hostname computer in XEP-0065 § 5.3.2 #1 | 872 hash is the same as hostname computed in XEP-0065 § 5.3.2 #1 |
873 @param peer_session_hash(unicode, None): hash used with the peer | |
874 None to use session_hash. | |
875 None must be used in 2 cases: | |
876 - when XEP-0065 is used with XEP-0096 | |
877 - when a peer connect to a proxy *he proposed himself* | |
878 in practice, peer_session_hash is only used by tryCandidates | |
872 @param delay(None, float): optional delay to wait before connection, in seconds | 879 @param delay(None, float): optional delay to wait before connection, in seconds |
873 @param profile: %(doc_profile)s | 880 @param profile: %(doc_profile)s |
874 @return (D): Deferred launched when TCP connection + Socks5 connection is done | 881 @return (D): Deferred launched when TCP connection + Socks5 connection is done |
875 """ | 882 """ |
876 factory = Socks5ClientFactory(self, session_hash, profile) | 883 if peer_session_hash is None: |
884 # for XEP-0065, only one hash is needed | |
885 peer_session_hash = session_hash | |
886 session = self.getSession(session_hash, profile) | |
887 factory = Socks5ClientFactory(self, session, peer_session_hash, profile) | |
877 candidate.factory = factory | 888 candidate.factory = factory |
878 if delay is None: | 889 if delay is None: |
879 d = defer.succeed(candidate.host) | 890 d = defer.succeed(candidate.host) |
880 else: | 891 else: |
881 d = sat_defer.DelayedDeferred(delay, candidate.host) | 892 d = sat_defer.DelayedDeferred(delay, candidate.host) |
882 d.addCallback(reactor.connectTCP, candidate.port, factory) | 893 d.addCallback(reactor.connectTCP, candidate.port, factory) |
883 d.addCallback(self._addConnector, candidate) | 894 d.addCallback(self._addConnector, candidate) |
884 return d | 895 return d |
885 | 896 |
886 def tryCandidates(self, candidates, session_hash, connection_cb=None, connection_eb=None, profile=C.PROF_KEY_NONE): | 897 def tryCandidates(self, candidates, session_hash, peer_session_hash, connection_cb=None, connection_eb=None, profile=C.PROF_KEY_NONE): |
887 defers_list = [] | 898 defers_list = [] |
888 | 899 |
889 for candidate in candidates: | 900 for candidate in candidates: |
890 delay = CANDIDATE_DELAY * len(defers_list) | 901 delay = CANDIDATE_DELAY * len(defers_list) |
891 if candidate.type == XEP_0065.TYPE_PROXY: | 902 if candidate.type == XEP_0065.TYPE_PROXY: |
892 delay += CANDIDATE_DELAY_PROXY | 903 delay += CANDIDATE_DELAY_PROXY |
893 d = self.connectCandidate(candidate, session_hash, delay, profile) | 904 d = self.connectCandidate(candidate, session_hash, peer_session_hash, delay, profile) |
894 if connection_cb is not None: | 905 if connection_cb is not None: |
895 d.addCallback(lambda dummy, candidate=candidate, profile=profile: connection_cb(candidate, profile)) | 906 d.addCallback(lambda dummy, candidate=candidate, profile=profile: connection_cb(candidate, profile)) |
896 if connection_eb is not None: | 907 if connection_eb is not None: |
897 d.addErrback(connection_eb, candidate, profile) | 908 d.addErrback(connection_eb, candidate, profile) |
898 defers_list.append(d) | 909 defers_list.append(d) |
899 | 910 |
900 return defers_list | 911 return defers_list |
901 | 912 |
902 def getBestCandidate(self, candidates, session_hash, profile=C.PROF_KEY_NONE): | 913 def getBestCandidate(self, candidates, session_hash, peer_session_hash=None, profile=C.PROF_KEY_NONE): |
903 """Get best candidate (according to priority) which can connect | 914 """Get best candidate (according to priority) which can connect |
904 | 915 |
905 @param candidates(iterable[Candidate]): candidates to test | 916 @param candidates(iterable[Candidate]): candidates to test |
906 @param session_hash(unicode): hash of the session | 917 @param session_hash(unicode): hash of the session |
907 hash is the same as hostname computer in XEP-0065 § 5.3.2 #1 | 918 hash is the same as hostname computed in XEP-0065 § 5.3.2 #1 |
919 @param peer_session_hash(unicode, None): hash of the other peer | |
920 only useful for XEP-0260, must be None for XEP-0065 streamhost candidates | |
908 @param profile: %(doc_profile)s | 921 @param profile: %(doc_profile)s |
909 @return (D(None, Candidate)): best candidate or None if none can connect | 922 @return (D(None, Candidate)): best candidate or None if none can connect |
910 """ | 923 """ |
911 defer_candidates = None | 924 defer_candidates = None |
912 | 925 |
932 def allTested(self): | 945 def allTested(self): |
933 log.debug(u"All candidates have been tested") | 946 log.debug(u"All candidates have been tested") |
934 good_candidates = [c for c in candidates if c] | 947 good_candidates = [c for c in candidates if c] |
935 return good_candidates[0] if good_candidates else None | 948 return good_candidates[0] if good_candidates else None |
936 | 949 |
937 defer_candidates = self.tryCandidates(candidates, session_hash, connectionCb, connectionEb, profile) | 950 defer_candidates = self.tryCandidates(candidates, session_hash, peer_session_hash, connectionCb, connectionEb, profile) |
938 d_list = defer.DeferredList(defer_candidates) | 951 d_list = defer.DeferredList(defer_candidates) |
939 d_list.addCallback(allTested) | 952 d_list.addCallback(allTested) |
940 return d_list | 953 return d_list |
941 | 954 |
942 def _timeOut(self, session_hash, client): | 955 def _timeOut(self, session_hash, client): |
964 reason='' if reason is None else reason.value, | 977 reason='' if reason is None else reason.value, |
965 id='' if sid is None else u' (id: {})'.format(sid), | 978 id='' if sid is None else u' (id: {})'.format(sid), |
966 )) | 979 )) |
967 | 980 |
968 try: | 981 try: |
969 # XXX: we need to be sure that hash is removed from self.hash_profiles_map | 982 assert self.hash_profiles_map[session_hash] == client.profile |
970 # ONLY if it's the profile requesting the session killing | 983 del self.hash_profiles_map[session_hash] |
971 # otherwise, this will result in a missing hash when the 2 peers | |
972 # are on the same instance | |
973 if self.hash_profiles_map[session_hash] == client.profile: | |
974 del self.hash_profiles_map[session_hash] | |
975 except KeyError: | 984 except KeyError: |
976 pass | 985 pass |
977 | 986 |
978 if sid is not None: | 987 if sid is not None: |
979 try: | 988 try: |
1119 | 1128 |
1120 def getSession(self, session_hash, profile): | 1129 def getSession(self, session_hash, profile): |
1121 """Return session data | 1130 """Return session data |
1122 | 1131 |
1123 @param session_hash(unicode): hash of the session | 1132 @param session_hash(unicode): hash of the session |
1124 hash is the same as hostname computer in XEP-0065 § 5.3.2 #1 | 1133 hash is the same as hostname computed in XEP-0065 § 5.3.2 #1 |
1125 @param profile(None, unicode): profile of the peer | 1134 @param profile(None, unicode): profile of the peer |
1126 None is used only if profile is unknown (this is only the case | 1135 None is used only if profile is unknown (this is only the case |
1127 for incoming request received by Socks5ServerFactory). None must | 1136 for incoming request received by Socks5ServerFactory). None must |
1128 only be used by Socks5ServerFactory. | 1137 only be used by Socks5ServerFactory. |
1129 See comments below for details | 1138 See comments below for details |
1163 } | 1172 } |
1164 | 1173 |
1165 if file_obj is not None: | 1174 if file_obj is not None: |
1166 session_data['file'] = file_obj | 1175 session_data['file'] = file_obj |
1167 | 1176 |
1168 if session_hash in self.hash_profiles_map: | 1177 assert session_hash not in self.hash_profiles_map |
1169 # The only case when 2 profiles want to register the same hash | 1178 self.hash_profiles_map[session_hash] = profile |
1170 # is when they are on the same instance | |
1171 log.info(u"Both Socks5 peers are on the same instance") | |
1172 # XXX:If both peers are on the same instance, they'll register the same | |
1173 # session_hash, so we'll have 2 profiles for the same hash. The first | |
1174 # one will be the responder (and so the second one the initiator). | |
1175 # As we'll keep the initiator choosed candidate (see XEP-0260 § 2.4 #4), | |
1176 # responder will handle the Socks5 server. Only the server will use | |
1177 # self.hash_profiles_map to get the profile, so we can ignore the second | |
1178 # one (the initiator profile). | |
1179 # There is no easy way to known if the incoming connection | |
1180 # to the Socks5Server is from initiator or responder, so this seams a | |
1181 # reasonable workaround. | |
1182 # NOTE: this workaround is only used with XEP-0260 | |
1183 else: | |
1184 self.hash_profiles_map[session_hash] = profile | |
1185 | 1179 |
1186 return session_data | 1180 return session_data |
1187 | 1181 |
1188 def associateFileObj(self, session_hash, file_obj, profile): | 1182 def associateFileObj(self, session_hash, file_obj, profile): |
1189 """Associate a file obj with a session""" | 1183 """Associate a file obj with a session""" |
1232 candidates.append(Candidate(host, port, type_, priority, jid_)) | 1226 candidates.append(Candidate(host, port, type_, priority, jid_)) |
1233 | 1227 |
1234 for candidate in candidates: | 1228 for candidate in candidates: |
1235 log.info(u"Candidate proposed: {}".format(candidate)) | 1229 log.info(u"Candidate proposed: {}".format(candidate)) |
1236 | 1230 |
1237 d = self.getBestCandidate(candidates, session_data['hash'], profile) | 1231 d = self.getBestCandidate(candidates, session_data['hash'], profile=profile) |
1238 d.addCallback(self._ackStream, iq_elt, session_data, client) | 1232 d.addCallback(self._ackStream, iq_elt, session_data, client) |
1239 | 1233 |
1240 def _ackStream(self, candidate, iq_elt, session_data, client): | 1234 def _ackStream(self, candidate, iq_elt, session_data, client): |
1241 if candidate is None: | 1235 if candidate is None: |
1242 log.info("No streamhost candidate worked, we have to end negotiation") | 1236 log.info("No streamhost candidate worked, we have to end negotiation") |