comparison src/plugins/plugin_xep_0065.py @ 1584:b57b4683dc33

plugin XEP-0065: session cleaning and timeout + log choosed candidate when using SI File Transfer
author Goffi <goffi@goffi.org>
date Fri, 13 Nov 2015 16:46:32 +0100
parents d46aae87c03a
children a34d7f621944
comparison
equal deleted inserted replaced
1583:d46aae87c03a 1584:b57b4683dc33
66 from twisted.words.protocols.jabber import error as jabber_error 66 from twisted.words.protocols.jabber import error as jabber_error
67 from twisted.words.protocols.jabber import jid 67 from twisted.words.protocols.jabber import jid
68 from twisted.words.protocols.jabber import xmlstream 68 from twisted.words.protocols.jabber import xmlstream
69 from twisted.protocols.basic import FileSender 69 from twisted.protocols.basic import FileSender
70 from twisted.internet import defer 70 from twisted.internet import defer
71 from twisted.python import failure
72 from collections import namedtuple 71 from collections import namedtuple
73 import struct 72 import struct
74 import hashlib 73 import hashlib
75 import uuid 74 import uuid
76 75
81 except ImportError: 80 except ImportError:
82 from wokkel.subprotocols import XMPPHandler 81 from wokkel.subprotocols import XMPPHandler
83 82
84 from wokkel import disco, iwokkel 83 from wokkel import disco, iwokkel
85 84
86 IQ_SET = '/iq[@type="set"]'
87 NS_BS = 'http://jabber.org/protocol/bytestreams'
88 BS_REQUEST = IQ_SET + '/query[@xmlns="' + NS_BS + '"]'
89 TIMEOUT = 60 # timeout for workflow
90 DEFER_KEY = 'finished' # key of the deferred used to track session end
91 SERVER_STARTING_PORT = 0 # starting number for server port search (0 to ask automatic attribution)
92
93 # priorities are candidates local priorities, must be a int between 0 and 65535
94 PRIORITY_BEST_DIRECT = 10000
95 PRIORITY_DIRECT = 5000
96 PRIORITY_ASSISTED = 1000
97 PRIORITY_PROXY = 0.2 # proxy is the last option for s5b
98 CANDIDATE_DELAY = 0.2 # see XEP-0260 §4
99 CANDIDATE_DELAY_PROXY = 0.2 # additional time for proxy types (see XEP-0260 §4 note 3)
100 85
101 PLUGIN_INFO = { 86 PLUGIN_INFO = {
102 "name": "XEP 0065 Plugin", 87 "name": "XEP 0065 Plugin",
103 "import_name": "XEP-0065", 88 "import_name": "XEP-0065",
104 "type": "XEP", 89 "type": "XEP",
107 "recommendations": ["NAT-PORT"], 92 "recommendations": ["NAT-PORT"],
108 "main": "XEP_0065", 93 "main": "XEP_0065",
109 "handler": "yes", 94 "handler": "yes",
110 "description": _("""Implementation of SOCKS5 Bytestreams""") 95 "description": _("""Implementation of SOCKS5 Bytestreams""")
111 } 96 }
97
98 IQ_SET = '/iq[@type="set"]'
99 NS_BS = 'http://jabber.org/protocol/bytestreams'
100 BS_REQUEST = IQ_SET + '/query[@xmlns="' + NS_BS + '"]'
101 TIMER_KEY = 'timer'
102 DEFER_KEY = 'finished' # key of the deferred used to track session end
103 SERVER_STARTING_PORT = 0 # starting number for server port search (0 to ask automatic attribution)
104
105 # priorities are candidates local priorities, must be a int between 0 and 65535
106 PRIORITY_BEST_DIRECT = 10000
107 PRIORITY_DIRECT = 5000
108 PRIORITY_ASSISTED = 1000
109 PRIORITY_PROXY = 0.2 # proxy is the last option for s5b
110 CANDIDATE_DELAY = 0.2 # see XEP-0260 §4
111 CANDIDATE_DELAY_PROXY = 0.2 # additional time for proxy types (see XEP-0260 §4 note 3)
112
113 TIMEOUT = 300 # maxium time between session creation and stream start
112 114
113 # XXX: by default eveything is automatic 115 # XXX: by default eveything is automatic
114 # TODO: use these params to force use of specific proxy/port/IP 116 # TODO: use these params to force use of specific proxy/port/IP
115 # PARAMS = """ 117 # PARAMS = """
116 # <params> 118 # <params>
568 try: 570 try:
569 protocol = session['protocols'][0] 571 protocol = session['protocols'][0]
570 except (KeyError, IndexError): 572 except (KeyError, IndexError):
571 log.error(u"Can't start file transfer, can't find protocol") 573 log.error(u"Can't start file transfer, can't find protocol")
572 else: 574 else:
575 session[TIMER_KEY].cancel()
573 protocol.startTransfer(chunk_size) 576 protocol.startTransfer(chunk_size)
574 577
575 def addToSession(self, session_hash, protocol): 578 def addToSession(self, session_hash, protocol):
576 """Check is session_hash is valid, and associate protocol with it 579 """Check is session_hash is valid, and associate protocol with it
577 580
639 642
640 def getSession(self): 643 def getSession(self):
641 return self.session 644 return self.session
642 645
643 def startTransfer(self, dummy=None, chunk_size=None): 646 def startTransfer(self, dummy=None, chunk_size=None):
647 self.session[TIMER_KEY].cancel()
644 self._protocol_instance.startTransfer(chunk_size) 648 self._protocol_instance.startTransfer(chunk_size)
645 649
646 def clientConnectionFailed(self, connector, reason): 650 def clientConnectionFailed(self, connector, reason):
647 log.debug(u"Connection failed") 651 log.debug(u"Connection failed")
648 self.connection.errback(reason) 652 self.connection.errback(reason)
716 The server is created if it doesn't exists yet 720 The server is created if it doesn't exists yet
717 self._server_factory_port is set on server creation 721 self._server_factory_port is set on server creation
718 """ 722 """
719 723
720 if self._server_factory is None: 724 if self._server_factory is None:
721 # self._server_factory = Socks5ServerFactory(self.host, self.hash_profiles_map, lambda sid, client: self._killSession(sid, client))
722 self._server_factory = Socks5ServerFactory(self) 725 self._server_factory = Socks5ServerFactory(self)
723 for port in xrange(SERVER_STARTING_PORT, 65356): 726 for port in xrange(SERVER_STARTING_PORT, 65356):
724 try: 727 try:
725 listening_port = reactor.listenTCP(port, self._server_factory) 728 listening_port = reactor.listenTCP(port, self._server_factory)
726 except internet_error.CannotListenError as e: 729 except internet_error.CannotListenError as e:
859 """ 862 """
860 candidate.factory.connector = connector 863 candidate.factory.connector = connector
861 return candidate.factory.connection 864 return candidate.factory.connection
862 865
863 def connectCandidate(self, candidate, session_hash, delay=None, profile=C.PROF_KEY_NONE): 866 def connectCandidate(self, candidate, session_hash, delay=None, profile=C.PROF_KEY_NONE):
864 """"Connect to a candidate 867 """Connect to a candidate
865 868
866 Connection will be done with a Socks5ClientFactory 869 Connection will be done with a Socks5ClientFactory
867
868 @param candidate(Candidate): candidate to connect to 870 @param candidate(Candidate): candidate to connect to
869 @param session_hash(unicode): hash of the session 871 @param session_hash(unicode): hash of the session
870 hash is the same as hostname computer in XEP-0065 § 5.3.2 #1 872 hash is the same as hostname computer in XEP-0065 § 5.3.2 #1
871 @param delay(None, float): optional delay to wait before connection, in seconds 873 @param delay(None, float): optional delay to wait before connection, in seconds
872 @param profile: %(doc_profile)s 874 @param profile: %(doc_profile)s
936 defer_candidates = self.tryCandidates(candidates, session_hash, connectionCb, connectionEb, profile) 938 defer_candidates = self.tryCandidates(candidates, session_hash, connectionCb, connectionEb, profile)
937 d_list = defer.DeferredList(defer_candidates) 939 d_list = defer.DeferredList(defer_candidates)
938 d_list.addCallback(allTested) 940 d_list.addCallback(allTested)
939 return d_list 941 return d_list
940 942
941 def _timeOut(self, sid, client): 943 def _timeOut(self, session_hash, client):
942 """Delecte current_stream id, called after timeout 944 """Called when stream was not started quickly enough
943 @param id: id of client.xep_0065_sid_session""" 945
944 log.info(_("Socks5 Bytestream: TimeOut reached for id {sid} [{profile}]").format( 946 @param session_hash(str): hash as returned by getSessionHash
945 sid=sid, profile=client.profile))
946 self._killSession(sid, client, u"TIMEOUT")
947
948 def _killSession(self, sid, client, failure_reason=None):
949 """Delete a current_stream id, clean up associated observers
950
951 @param sid(unicode): session id
952 @param client: %(doc_client)s 947 @param client: %(doc_client)s
953 @param failure_reason(None, unicode): if None the session is successful 948 """
954 else, will be used to call failure_cb 949 log.info(u"Socks5 Bytestream: TimeOut reached")
955 """ 950 session = self.getSession(session_hash, client.profile)
956 try: 951 session[DEFER_KEY].errback(exceptions.TimeOutError)
957 session = client.xep_0065_sid_session[sid] 952
958 except KeyError: 953 def _killSession(self, reason, session_hash, sid, client):
959 log.warning(_("kill id called on a non existant id")) 954 """Clean the current session
960 return 955
961 956 @param session_hash(str): hash as returned by getSessionHash
962 try: 957 @param sid(None, unicode): session id
963 observer_cb = session['observer_cb'] 958 or None if self.xep_0065_sid_session was not used
959 @param client: %(doc_client)s
960 @param reason(None, failure.Failure): None if eveything was fine, a failure else
961 @return (None, failure.Failure): reason is returned
962 """
963 log.debug(u'Cleaning session with hash {hash}{id}: {reason}'.format(
964 hash=session_hash,
965 reason='' if reason is None else reason.value,
966 id='' if sid is None else u' (id: {})'.format(sid),
967 ))
968
969 try:
970 # XXX: we need to be sure that hash is removed from self.hash_profiles_map
971 # ONLY if it's the profile requesting the session killing
972 # otherwise, this will result in a missing hash when the 2 peers
973 # are on the same instance
974 if self.hash_profiles_map[session_hash] == client.profile:
975 del self.hash_profiles_map[session_hash]
964 except KeyError: 976 except KeyError:
965 pass 977 pass
978
979 if sid is not None:
980 try:
981 del client.xep_0065_sid_session[sid]
982 except KeyError:
983 log.warning(u"Session id {} is unknown".format(sid))
984
985 try:
986 session_data = client._s5b_sessions[session_hash]
987 except KeyError:
988 log.warning(u"There is no session with this hash")
989 return
966 else: 990 else:
967 client.xmlstream.removeObserver(session["event_data"], observer_cb) 991 del client._s5b_sessions[session_hash]
968 992
969 if session['timer'].active(): 993 try:
970 session['timer'].cancel() 994 session_data['timer'].cancel()
971 995 except (internet_error.AlreadyCalled, internet_error.AlreadyCancelled):
972 del client.xep_0065_sid_session[sid]
973
974 # FIXME: to check
975 try:
976 session_hash = session.get['hash']
977 del self.hash_profiles_map[session_hash]
978 # FIXME: check that self.hash_profiles_map is correctly cleaned in all cases (timeout, normal flow, etc).
979 except KeyError:
980 log.debug(u"Not hash found for this session")
981 pass 996 pass
982 997
983 success = failure_reason is None 998 return reason
984 stream_d = session[DEFER_KEY]
985
986 if success:
987 stream_d.callback(None)
988 else:
989 stream_d.errback(failure.Failure(exceptions.DataError(failure_reason)))
990 999
991 def startStream(self, file_obj, to_jid, sid, profile=C.PROF_KEY_NONE): 1000 def startStream(self, file_obj, to_jid, sid, profile=C.PROF_KEY_NONE):
992 """Launch the stream workflow 1001 """Launch the stream workflow
993 1002
994 @param file_obj: file_obj to send 1003 @param file_obj: file_obj to send
1016 for candidate in candidates: 1025 for candidate in candidates:
1017 streamhost = query_elt.addElement('streamhost') 1026 streamhost = query_elt.addElement('streamhost')
1018 streamhost['host'] = candidate.host 1027 streamhost['host'] = candidate.host
1019 streamhost['port'] = str(candidate.port) 1028 streamhost['port'] = str(candidate.port)
1020 streamhost['jid'] = candidate.jid.full() 1029 streamhost['jid'] = candidate.jid.full()
1030 log.debug(u"Candidate proposed: {}".format(candidate))
1021 1031
1022 d = iq_elt.send() 1032 d = iq_elt.send()
1023 args = [session_data, client] 1033 args = [session_data, client]
1024 d.addCallbacks(self._IQNegotiationCb, self._IQNegotiationEb, args, None, args) 1034 d.addCallbacks(self._IQNegotiationCb, self._IQNegotiationEb, args, None, args)
1025 1035
1045 try: 1055 try:
1046 candidate = (c for c in session_data['candidates'] if c.jid == streamhost_jid).next() 1056 candidate = (c for c in session_data['candidates'] if c.jid == streamhost_jid).next()
1047 except StopIteration: 1057 except StopIteration:
1048 log.warning(u"Candidate [{jid}] is unknown !".format(jid=streamhost_jid.full())) 1058 log.warning(u"Candidate [{jid}] is unknown !".format(jid=streamhost_jid.full()))
1049 return 1059 return
1060 else:
1061 log.info(u"Candidate choosed by target: {}".format(candidate))
1050 1062
1051 if candidate.type == XEP_0065.TYPE_PROXY: 1063 if candidate.type == XEP_0065.TYPE_PROXY:
1052 log.info(u"A Socks5 proxy is used") 1064 log.info(u"A Socks5 proxy is used")
1053 d = self.connectCandidate(candidate, session_data['hash'], profile=client.profile) 1065 d = self.connectCandidate(candidate, session_data['hash'], profile=client.profile)
1054 d.addCallback(lambda dummy: candidate.activate(session_data['id'], session_data['peer_jid'], client)) 1066 d.addCallback(lambda dummy: candidate.activate(session_data['id'], session_data['peer_jid'], client))
1088 if requester: 1100 if requester:
1089 session_hash = getSessionHash(client.jid, to_jid, sid) 1101 session_hash = getSessionHash(client.jid, to_jid, sid)
1090 session_data = self._registerHash(session_hash, file_obj, profile) 1102 session_data = self._registerHash(session_hash, file_obj, profile)
1091 else: 1103 else:
1092 session_hash = getSessionHash(to_jid, client.jid, sid) 1104 session_hash = getSessionHash(to_jid, client.jid, sid)
1105 session_d = defer.Deferred()
1106 session_d.addBoth(self._killSession, session_hash, sid, client)
1093 session_data = client._s5b_sessions[session_hash] = { 1107 session_data = client._s5b_sessions[session_hash] = {
1094 DEFER_KEY: defer.Deferred(), 1108 DEFER_KEY: session_d,
1109 TIMER_KEY: reactor.callLater(TIMEOUT, self._timeOut, session_hash, client),
1095 } 1110 }
1096 client.xep_0065_sid_session[sid] = session_data 1111 client.xep_0065_sid_session[sid] = session_data
1097 session_data.update( 1112 session_data.update(
1098 {'id': sid, 1113 {'id': sid,
1099 'peer_jid': to_jid, 1114 'peer_jid': to_jid,
1138 @param profile: %(doc_profile)s 1153 @param profile: %(doc_profile)s
1139 return (dict): session data 1154 return (dict): session data
1140 """ 1155 """
1141 client = self.host.getClient(profile) 1156 client = self.host.getClient(profile)
1142 assert session_hash not in client._s5b_sessions 1157 assert session_hash not in client._s5b_sessions
1158 session_d = defer.Deferred()
1159 session_d.addBoth(self._killSession, session_hash, None, client)
1143 session_data = client._s5b_sessions[session_hash] = { 1160 session_data = client._s5b_sessions[session_hash] = {
1144 "file": file_obj, 1161 "file": file_obj,
1145 DEFER_KEY: defer.Deferred(), 1162 DEFER_KEY: session_d,
1163 TIMER_KEY: reactor.callLater(TIMEOUT, self._timeOut, session_hash, client),
1146 } 1164 }
1147 if session_hash in self.hash_profiles_map: 1165 if session_hash in self.hash_profiles_map:
1148 # The only case when 2 profiles want to register the same hash 1166 # The only case when 2 profiles want to register the same hash
1149 # is when they are on the same instance 1167 # is when they are on the same instance
1150 log.info(u"Both Socks5 peers are on the same instance") 1168 log.info(u"Both Socks5 peers are on the same instance")
1156 # self.hash_profiles_map to get the profile, so we can ignore the second 1174 # self.hash_profiles_map to get the profile, so we can ignore the second
1157 # one (the initiator profile). 1175 # one (the initiator profile).
1158 # There is no easy way to known if the incoming connection 1176 # There is no easy way to known if the incoming connection
1159 # to the Socks5Server is from initiator or responder, so this seams a 1177 # to the Socks5Server is from initiator or responder, so this seams a
1160 # reasonable workaround. 1178 # reasonable workaround.
1179 # NOTE: this workaround is only used with XEP-0260
1161 else: 1180 else:
1162 self.hash_profiles_map[session_hash] = profile 1181 self.hash_profiles_map[session_hash] = profile
1163 1182
1164 return session_data 1183 return session_data
1165 1184
1211 1230
1212 def _ackStream(self, candidate, iq_elt, session_data, client): 1231 def _ackStream(self, candidate, iq_elt, session_data, client):
1213 if candidate is None: 1232 if candidate is None:
1214 log.info("No streamhost candidate worked, we have to end negotiation") 1233 log.info("No streamhost candidate worked, we have to end negotiation")
1215 return client.sendError(iq_elt, 'item-not-found') 1234 return client.sendError(iq_elt, 'item-not-found')
1216 log.debug(u"activating stream") 1235 log.info(u"We choose: {}".format(candidate))
1217 result_elt = xmlstream.toResponse(iq_elt, 'result') 1236 result_elt = xmlstream.toResponse(iq_elt, 'result')
1218 query_elt = result_elt.addElement((NS_BS, 'query')) 1237 query_elt = result_elt.addElement((NS_BS, 'query'))
1219 query_elt['sid'] = session_data['id'] 1238 query_elt['sid'] = session_data['id']
1220 streamhost_used_elt = query_elt.addElement('streamhost-used') 1239 streamhost_used_elt = query_elt.addElement('streamhost-used')
1221 streamhost_used_elt['jid'] = candidate.jid.full() 1240 streamhost_used_elt['jid'] = candidate.jid.full()