Mercurial > libervia-backend
comparison src/plugins/plugin_xep_0065.py @ 1584:b57b4683dc33
plugin XEP-0065: session cleaning and timeout + log choosed candidate when using SI File Transfer
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 13 Nov 2015 16:46:32 +0100 |
parents | d46aae87c03a |
children | a34d7f621944 |
comparison
equal
deleted
inserted
replaced
1583:d46aae87c03a | 1584:b57b4683dc33 |
---|---|
66 from twisted.words.protocols.jabber import error as jabber_error | 66 from twisted.words.protocols.jabber import error as jabber_error |
67 from twisted.words.protocols.jabber import jid | 67 from twisted.words.protocols.jabber import jid |
68 from twisted.words.protocols.jabber import xmlstream | 68 from twisted.words.protocols.jabber import xmlstream |
69 from twisted.protocols.basic import FileSender | 69 from twisted.protocols.basic import FileSender |
70 from twisted.internet import defer | 70 from twisted.internet import defer |
71 from twisted.python import failure | |
72 from collections import namedtuple | 71 from collections import namedtuple |
73 import struct | 72 import struct |
74 import hashlib | 73 import hashlib |
75 import uuid | 74 import uuid |
76 | 75 |
81 except ImportError: | 80 except ImportError: |
82 from wokkel.subprotocols import XMPPHandler | 81 from wokkel.subprotocols import XMPPHandler |
83 | 82 |
84 from wokkel import disco, iwokkel | 83 from wokkel import disco, iwokkel |
85 | 84 |
86 IQ_SET = '/iq[@type="set"]' | |
87 NS_BS = 'http://jabber.org/protocol/bytestreams' | |
88 BS_REQUEST = IQ_SET + '/query[@xmlns="' + NS_BS + '"]' | |
89 TIMEOUT = 60 # timeout for workflow | |
90 DEFER_KEY = 'finished' # key of the deferred used to track session end | |
91 SERVER_STARTING_PORT = 0 # starting number for server port search (0 to ask automatic attribution) | |
92 | |
93 # priorities are candidates local priorities, must be a int between 0 and 65535 | |
94 PRIORITY_BEST_DIRECT = 10000 | |
95 PRIORITY_DIRECT = 5000 | |
96 PRIORITY_ASSISTED = 1000 | |
97 PRIORITY_PROXY = 0.2 # proxy is the last option for s5b | |
98 CANDIDATE_DELAY = 0.2 # see XEP-0260 §4 | |
99 CANDIDATE_DELAY_PROXY = 0.2 # additional time for proxy types (see XEP-0260 §4 note 3) | |
100 | 85 |
101 PLUGIN_INFO = { | 86 PLUGIN_INFO = { |
102 "name": "XEP 0065 Plugin", | 87 "name": "XEP 0065 Plugin", |
103 "import_name": "XEP-0065", | 88 "import_name": "XEP-0065", |
104 "type": "XEP", | 89 "type": "XEP", |
107 "recommendations": ["NAT-PORT"], | 92 "recommendations": ["NAT-PORT"], |
108 "main": "XEP_0065", | 93 "main": "XEP_0065", |
109 "handler": "yes", | 94 "handler": "yes", |
110 "description": _("""Implementation of SOCKS5 Bytestreams""") | 95 "description": _("""Implementation of SOCKS5 Bytestreams""") |
111 } | 96 } |
97 | |
98 IQ_SET = '/iq[@type="set"]' | |
99 NS_BS = 'http://jabber.org/protocol/bytestreams' | |
100 BS_REQUEST = IQ_SET + '/query[@xmlns="' + NS_BS + '"]' | |
101 TIMER_KEY = 'timer' | |
102 DEFER_KEY = 'finished' # key of the deferred used to track session end | |
103 SERVER_STARTING_PORT = 0 # starting number for server port search (0 to ask automatic attribution) | |
104 | |
105 # priorities are candidates local priorities, must be a int between 0 and 65535 | |
106 PRIORITY_BEST_DIRECT = 10000 | |
107 PRIORITY_DIRECT = 5000 | |
108 PRIORITY_ASSISTED = 1000 | |
109 PRIORITY_PROXY = 0.2 # proxy is the last option for s5b | |
110 CANDIDATE_DELAY = 0.2 # see XEP-0260 §4 | |
111 CANDIDATE_DELAY_PROXY = 0.2 # additional time for proxy types (see XEP-0260 §4 note 3) | |
112 | |
113 TIMEOUT = 300 # maxium time between session creation and stream start | |
112 | 114 |
113 # XXX: by default eveything is automatic | 115 # XXX: by default eveything is automatic |
114 # TODO: use these params to force use of specific proxy/port/IP | 116 # TODO: use these params to force use of specific proxy/port/IP |
115 # PARAMS = """ | 117 # PARAMS = """ |
116 # <params> | 118 # <params> |
568 try: | 570 try: |
569 protocol = session['protocols'][0] | 571 protocol = session['protocols'][0] |
570 except (KeyError, IndexError): | 572 except (KeyError, IndexError): |
571 log.error(u"Can't start file transfer, can't find protocol") | 573 log.error(u"Can't start file transfer, can't find protocol") |
572 else: | 574 else: |
575 session[TIMER_KEY].cancel() | |
573 protocol.startTransfer(chunk_size) | 576 protocol.startTransfer(chunk_size) |
574 | 577 |
575 def addToSession(self, session_hash, protocol): | 578 def addToSession(self, session_hash, protocol): |
576 """Check is session_hash is valid, and associate protocol with it | 579 """Check is session_hash is valid, and associate protocol with it |
577 | 580 |
639 | 642 |
640 def getSession(self): | 643 def getSession(self): |
641 return self.session | 644 return self.session |
642 | 645 |
643 def startTransfer(self, dummy=None, chunk_size=None): | 646 def startTransfer(self, dummy=None, chunk_size=None): |
647 self.session[TIMER_KEY].cancel() | |
644 self._protocol_instance.startTransfer(chunk_size) | 648 self._protocol_instance.startTransfer(chunk_size) |
645 | 649 |
646 def clientConnectionFailed(self, connector, reason): | 650 def clientConnectionFailed(self, connector, reason): |
647 log.debug(u"Connection failed") | 651 log.debug(u"Connection failed") |
648 self.connection.errback(reason) | 652 self.connection.errback(reason) |
716 The server is created if it doesn't exists yet | 720 The server is created if it doesn't exists yet |
717 self._server_factory_port is set on server creation | 721 self._server_factory_port is set on server creation |
718 """ | 722 """ |
719 | 723 |
720 if self._server_factory is None: | 724 if self._server_factory is None: |
721 # self._server_factory = Socks5ServerFactory(self.host, self.hash_profiles_map, lambda sid, client: self._killSession(sid, client)) | |
722 self._server_factory = Socks5ServerFactory(self) | 725 self._server_factory = Socks5ServerFactory(self) |
723 for port in xrange(SERVER_STARTING_PORT, 65356): | 726 for port in xrange(SERVER_STARTING_PORT, 65356): |
724 try: | 727 try: |
725 listening_port = reactor.listenTCP(port, self._server_factory) | 728 listening_port = reactor.listenTCP(port, self._server_factory) |
726 except internet_error.CannotListenError as e: | 729 except internet_error.CannotListenError as e: |
859 """ | 862 """ |
860 candidate.factory.connector = connector | 863 candidate.factory.connector = connector |
861 return candidate.factory.connection | 864 return candidate.factory.connection |
862 | 865 |
863 def connectCandidate(self, candidate, session_hash, delay=None, profile=C.PROF_KEY_NONE): | 866 def connectCandidate(self, candidate, session_hash, delay=None, profile=C.PROF_KEY_NONE): |
864 """"Connect to a candidate | 867 """Connect to a candidate |
865 | 868 |
866 Connection will be done with a Socks5ClientFactory | 869 Connection will be done with a Socks5ClientFactory |
867 | |
868 @param candidate(Candidate): candidate to connect to | 870 @param candidate(Candidate): candidate to connect to |
869 @param session_hash(unicode): hash of the session | 871 @param session_hash(unicode): hash of the session |
870 hash is the same as hostname computer in XEP-0065 § 5.3.2 #1 | 872 hash is the same as hostname computer in XEP-0065 § 5.3.2 #1 |
871 @param delay(None, float): optional delay to wait before connection, in seconds | 873 @param delay(None, float): optional delay to wait before connection, in seconds |
872 @param profile: %(doc_profile)s | 874 @param profile: %(doc_profile)s |
936 defer_candidates = self.tryCandidates(candidates, session_hash, connectionCb, connectionEb, profile) | 938 defer_candidates = self.tryCandidates(candidates, session_hash, connectionCb, connectionEb, profile) |
937 d_list = defer.DeferredList(defer_candidates) | 939 d_list = defer.DeferredList(defer_candidates) |
938 d_list.addCallback(allTested) | 940 d_list.addCallback(allTested) |
939 return d_list | 941 return d_list |
940 | 942 |
941 def _timeOut(self, sid, client): | 943 def _timeOut(self, session_hash, client): |
942 """Delecte current_stream id, called after timeout | 944 """Called when stream was not started quickly enough |
943 @param id: id of client.xep_0065_sid_session""" | 945 |
944 log.info(_("Socks5 Bytestream: TimeOut reached for id {sid} [{profile}]").format( | 946 @param session_hash(str): hash as returned by getSessionHash |
945 sid=sid, profile=client.profile)) | |
946 self._killSession(sid, client, u"TIMEOUT") | |
947 | |
948 def _killSession(self, sid, client, failure_reason=None): | |
949 """Delete a current_stream id, clean up associated observers | |
950 | |
951 @param sid(unicode): session id | |
952 @param client: %(doc_client)s | 947 @param client: %(doc_client)s |
953 @param failure_reason(None, unicode): if None the session is successful | 948 """ |
954 else, will be used to call failure_cb | 949 log.info(u"Socks5 Bytestream: TimeOut reached") |
955 """ | 950 session = self.getSession(session_hash, client.profile) |
956 try: | 951 session[DEFER_KEY].errback(exceptions.TimeOutError) |
957 session = client.xep_0065_sid_session[sid] | 952 |
958 except KeyError: | 953 def _killSession(self, reason, session_hash, sid, client): |
959 log.warning(_("kill id called on a non existant id")) | 954 """Clean the current session |
960 return | 955 |
961 | 956 @param session_hash(str): hash as returned by getSessionHash |
962 try: | 957 @param sid(None, unicode): session id |
963 observer_cb = session['observer_cb'] | 958 or None if self.xep_0065_sid_session was not used |
959 @param client: %(doc_client)s | |
960 @param reason(None, failure.Failure): None if eveything was fine, a failure else | |
961 @return (None, failure.Failure): reason is returned | |
962 """ | |
963 log.debug(u'Cleaning session with hash {hash}{id}: {reason}'.format( | |
964 hash=session_hash, | |
965 reason='' if reason is None else reason.value, | |
966 id='' if sid is None else u' (id: {})'.format(sid), | |
967 )) | |
968 | |
969 try: | |
970 # XXX: we need to be sure that hash is removed from self.hash_profiles_map | |
971 # ONLY if it's the profile requesting the session killing | |
972 # otherwise, this will result in a missing hash when the 2 peers | |
973 # are on the same instance | |
974 if self.hash_profiles_map[session_hash] == client.profile: | |
975 del self.hash_profiles_map[session_hash] | |
964 except KeyError: | 976 except KeyError: |
965 pass | 977 pass |
978 | |
979 if sid is not None: | |
980 try: | |
981 del client.xep_0065_sid_session[sid] | |
982 except KeyError: | |
983 log.warning(u"Session id {} is unknown".format(sid)) | |
984 | |
985 try: | |
986 session_data = client._s5b_sessions[session_hash] | |
987 except KeyError: | |
988 log.warning(u"There is no session with this hash") | |
989 return | |
966 else: | 990 else: |
967 client.xmlstream.removeObserver(session["event_data"], observer_cb) | 991 del client._s5b_sessions[session_hash] |
968 | 992 |
969 if session['timer'].active(): | 993 try: |
970 session['timer'].cancel() | 994 session_data['timer'].cancel() |
971 | 995 except (internet_error.AlreadyCalled, internet_error.AlreadyCancelled): |
972 del client.xep_0065_sid_session[sid] | |
973 | |
974 # FIXME: to check | |
975 try: | |
976 session_hash = session.get['hash'] | |
977 del self.hash_profiles_map[session_hash] | |
978 # FIXME: check that self.hash_profiles_map is correctly cleaned in all cases (timeout, normal flow, etc). | |
979 except KeyError: | |
980 log.debug(u"Not hash found for this session") | |
981 pass | 996 pass |
982 | 997 |
983 success = failure_reason is None | 998 return reason |
984 stream_d = session[DEFER_KEY] | |
985 | |
986 if success: | |
987 stream_d.callback(None) | |
988 else: | |
989 stream_d.errback(failure.Failure(exceptions.DataError(failure_reason))) | |
990 | 999 |
991 def startStream(self, file_obj, to_jid, sid, profile=C.PROF_KEY_NONE): | 1000 def startStream(self, file_obj, to_jid, sid, profile=C.PROF_KEY_NONE): |
992 """Launch the stream workflow | 1001 """Launch the stream workflow |
993 | 1002 |
994 @param file_obj: file_obj to send | 1003 @param file_obj: file_obj to send |
1016 for candidate in candidates: | 1025 for candidate in candidates: |
1017 streamhost = query_elt.addElement('streamhost') | 1026 streamhost = query_elt.addElement('streamhost') |
1018 streamhost['host'] = candidate.host | 1027 streamhost['host'] = candidate.host |
1019 streamhost['port'] = str(candidate.port) | 1028 streamhost['port'] = str(candidate.port) |
1020 streamhost['jid'] = candidate.jid.full() | 1029 streamhost['jid'] = candidate.jid.full() |
1030 log.debug(u"Candidate proposed: {}".format(candidate)) | |
1021 | 1031 |
1022 d = iq_elt.send() | 1032 d = iq_elt.send() |
1023 args = [session_data, client] | 1033 args = [session_data, client] |
1024 d.addCallbacks(self._IQNegotiationCb, self._IQNegotiationEb, args, None, args) | 1034 d.addCallbacks(self._IQNegotiationCb, self._IQNegotiationEb, args, None, args) |
1025 | 1035 |
1045 try: | 1055 try: |
1046 candidate = (c for c in session_data['candidates'] if c.jid == streamhost_jid).next() | 1056 candidate = (c for c in session_data['candidates'] if c.jid == streamhost_jid).next() |
1047 except StopIteration: | 1057 except StopIteration: |
1048 log.warning(u"Candidate [{jid}] is unknown !".format(jid=streamhost_jid.full())) | 1058 log.warning(u"Candidate [{jid}] is unknown !".format(jid=streamhost_jid.full())) |
1049 return | 1059 return |
1060 else: | |
1061 log.info(u"Candidate choosed by target: {}".format(candidate)) | |
1050 | 1062 |
1051 if candidate.type == XEP_0065.TYPE_PROXY: | 1063 if candidate.type == XEP_0065.TYPE_PROXY: |
1052 log.info(u"A Socks5 proxy is used") | 1064 log.info(u"A Socks5 proxy is used") |
1053 d = self.connectCandidate(candidate, session_data['hash'], profile=client.profile) | 1065 d = self.connectCandidate(candidate, session_data['hash'], profile=client.profile) |
1054 d.addCallback(lambda dummy: candidate.activate(session_data['id'], session_data['peer_jid'], client)) | 1066 d.addCallback(lambda dummy: candidate.activate(session_data['id'], session_data['peer_jid'], client)) |
1088 if requester: | 1100 if requester: |
1089 session_hash = getSessionHash(client.jid, to_jid, sid) | 1101 session_hash = getSessionHash(client.jid, to_jid, sid) |
1090 session_data = self._registerHash(session_hash, file_obj, profile) | 1102 session_data = self._registerHash(session_hash, file_obj, profile) |
1091 else: | 1103 else: |
1092 session_hash = getSessionHash(to_jid, client.jid, sid) | 1104 session_hash = getSessionHash(to_jid, client.jid, sid) |
1105 session_d = defer.Deferred() | |
1106 session_d.addBoth(self._killSession, session_hash, sid, client) | |
1093 session_data = client._s5b_sessions[session_hash] = { | 1107 session_data = client._s5b_sessions[session_hash] = { |
1094 DEFER_KEY: defer.Deferred(), | 1108 DEFER_KEY: session_d, |
1109 TIMER_KEY: reactor.callLater(TIMEOUT, self._timeOut, session_hash, client), | |
1095 } | 1110 } |
1096 client.xep_0065_sid_session[sid] = session_data | 1111 client.xep_0065_sid_session[sid] = session_data |
1097 session_data.update( | 1112 session_data.update( |
1098 {'id': sid, | 1113 {'id': sid, |
1099 'peer_jid': to_jid, | 1114 'peer_jid': to_jid, |
1138 @param profile: %(doc_profile)s | 1153 @param profile: %(doc_profile)s |
1139 return (dict): session data | 1154 return (dict): session data |
1140 """ | 1155 """ |
1141 client = self.host.getClient(profile) | 1156 client = self.host.getClient(profile) |
1142 assert session_hash not in client._s5b_sessions | 1157 assert session_hash not in client._s5b_sessions |
1158 session_d = defer.Deferred() | |
1159 session_d.addBoth(self._killSession, session_hash, None, client) | |
1143 session_data = client._s5b_sessions[session_hash] = { | 1160 session_data = client._s5b_sessions[session_hash] = { |
1144 "file": file_obj, | 1161 "file": file_obj, |
1145 DEFER_KEY: defer.Deferred(), | 1162 DEFER_KEY: session_d, |
1163 TIMER_KEY: reactor.callLater(TIMEOUT, self._timeOut, session_hash, client), | |
1146 } | 1164 } |
1147 if session_hash in self.hash_profiles_map: | 1165 if session_hash in self.hash_profiles_map: |
1148 # The only case when 2 profiles want to register the same hash | 1166 # The only case when 2 profiles want to register the same hash |
1149 # is when they are on the same instance | 1167 # is when they are on the same instance |
1150 log.info(u"Both Socks5 peers are on the same instance") | 1168 log.info(u"Both Socks5 peers are on the same instance") |
1156 # self.hash_profiles_map to get the profile, so we can ignore the second | 1174 # self.hash_profiles_map to get the profile, so we can ignore the second |
1157 # one (the initiator profile). | 1175 # one (the initiator profile). |
1158 # There is no easy way to known if the incoming connection | 1176 # There is no easy way to known if the incoming connection |
1159 # to the Socks5Server is from initiator or responder, so this seams a | 1177 # to the Socks5Server is from initiator or responder, so this seams a |
1160 # reasonable workaround. | 1178 # reasonable workaround. |
1179 # NOTE: this workaround is only used with XEP-0260 | |
1161 else: | 1180 else: |
1162 self.hash_profiles_map[session_hash] = profile | 1181 self.hash_profiles_map[session_hash] = profile |
1163 | 1182 |
1164 return session_data | 1183 return session_data |
1165 | 1184 |
1211 | 1230 |
1212 def _ackStream(self, candidate, iq_elt, session_data, client): | 1231 def _ackStream(self, candidate, iq_elt, session_data, client): |
1213 if candidate is None: | 1232 if candidate is None: |
1214 log.info("No streamhost candidate worked, we have to end negotiation") | 1233 log.info("No streamhost candidate worked, we have to end negotiation") |
1215 return client.sendError(iq_elt, 'item-not-found') | 1234 return client.sendError(iq_elt, 'item-not-found') |
1216 log.debug(u"activating stream") | 1235 log.info(u"We choose: {}".format(candidate)) |
1217 result_elt = xmlstream.toResponse(iq_elt, 'result') | 1236 result_elt = xmlstream.toResponse(iq_elt, 'result') |
1218 query_elt = result_elt.addElement((NS_BS, 'query')) | 1237 query_elt = result_elt.addElement((NS_BS, 'query')) |
1219 query_elt['sid'] = session_data['id'] | 1238 query_elt['sid'] = session_data['id'] |
1220 streamhost_used_elt = query_elt.addElement('streamhost-used') | 1239 streamhost_used_elt = query_elt.addElement('streamhost-used') |
1221 streamhost_used_elt['jid'] = candidate.jid.full() | 1240 streamhost_used_elt['jid'] = candidate.jid.full() |