# HG changeset patch # User Goffi # Date 1447429592 -3600 # Node ID b57b4683dc3376a3b36bdbe27971b424c6e336cf # Parent d46aae87c03a86c525b41cf667f45108ba37eaaa plugin XEP-0065: session cleaning and timeout + log choosed candidate when using SI File Transfer diff -r d46aae87c03a -r b57b4683dc33 src/core/exceptions.py --- a/src/core/exceptions.py Fri Nov 13 16:46:31 2015 +0100 +++ b/src/core/exceptions.py Fri Nov 13 16:46:32 2015 +0100 @@ -72,6 +72,10 @@ pass +class TimeOutError(Exception): + pass + + class CancelError(Exception): pass diff -r d46aae87c03a -r b57b4683dc33 src/plugins/plugin_xep_0065.py --- a/src/plugins/plugin_xep_0065.py Fri Nov 13 16:46:31 2015 +0100 +++ b/src/plugins/plugin_xep_0065.py Fri Nov 13 16:46:32 2015 +0100 @@ -68,7 +68,6 @@ from twisted.words.protocols.jabber import xmlstream from twisted.protocols.basic import FileSender from twisted.internet import defer -from twisted.python import failure from collections import namedtuple import struct import hashlib @@ -83,20 +82,6 @@ from wokkel import disco, iwokkel -IQ_SET = '/iq[@type="set"]' -NS_BS = 'http://jabber.org/protocol/bytestreams' -BS_REQUEST = IQ_SET + '/query[@xmlns="' + NS_BS + '"]' -TIMEOUT = 60 # timeout for workflow -DEFER_KEY = 'finished' # key of the deferred used to track session end -SERVER_STARTING_PORT = 0 # starting number for server port search (0 to ask automatic attribution) - -# priorities are candidates local priorities, must be a int between 0 and 65535 -PRIORITY_BEST_DIRECT = 10000 -PRIORITY_DIRECT = 5000 -PRIORITY_ASSISTED = 1000 -PRIORITY_PROXY = 0.2 # proxy is the last option for s5b -CANDIDATE_DELAY = 0.2 # see XEP-0260 §4 -CANDIDATE_DELAY_PROXY = 0.2 # additional time for proxy types (see XEP-0260 §4 note 3) PLUGIN_INFO = { "name": "XEP 0065 Plugin", @@ -110,6 +95,23 @@ "description": _("""Implementation of SOCKS5 Bytestreams""") } +IQ_SET = '/iq[@type="set"]' +NS_BS = 'http://jabber.org/protocol/bytestreams' +BS_REQUEST = IQ_SET + '/query[@xmlns="' + NS_BS + '"]' +TIMER_KEY = 'timer' +DEFER_KEY = 'finished' # key of the deferred used to track session end +SERVER_STARTING_PORT = 0 # starting number for server port search (0 to ask automatic attribution) + +# priorities are candidates local priorities, must be a int between 0 and 65535 +PRIORITY_BEST_DIRECT = 10000 +PRIORITY_DIRECT = 5000 +PRIORITY_ASSISTED = 1000 +PRIORITY_PROXY = 0.2 # proxy is the last option for s5b +CANDIDATE_DELAY = 0.2 # see XEP-0260 §4 +CANDIDATE_DELAY_PROXY = 0.2 # additional time for proxy types (see XEP-0260 §4 note 3) + +TIMEOUT = 300 # maxium time between session creation and stream start + # XXX: by default eveything is automatic # TODO: use these params to force use of specific proxy/port/IP # PARAMS = """ @@ -570,6 +572,7 @@ except (KeyError, IndexError): log.error(u"Can't start file transfer, can't find protocol") else: + session[TIMER_KEY].cancel() protocol.startTransfer(chunk_size) def addToSession(self, session_hash, protocol): @@ -641,6 +644,7 @@ return self.session def startTransfer(self, dummy=None, chunk_size=None): + self.session[TIMER_KEY].cancel() self._protocol_instance.startTransfer(chunk_size) def clientConnectionFailed(self, connector, reason): @@ -718,7 +722,6 @@ """ if self._server_factory is None: - # self._server_factory = Socks5ServerFactory(self.host, self.hash_profiles_map, lambda sid, client: self._killSession(sid, client)) self._server_factory = Socks5ServerFactory(self) for port in xrange(SERVER_STARTING_PORT, 65356): try: @@ -861,10 +864,9 @@ return candidate.factory.connection def connectCandidate(self, candidate, session_hash, delay=None, profile=C.PROF_KEY_NONE): - """"Connect to a candidate + """Connect to a candidate Connection will be done with a Socks5ClientFactory - @param candidate(Candidate): candidate to connect to @param session_hash(unicode): hash of the session hash is the same as hostname computer in XEP-0065 § 5.3.2 #1 @@ -938,55 +940,62 @@ d_list.addCallback(allTested) return d_list - def _timeOut(self, sid, client): - """Delecte current_stream id, called after timeout - @param id: id of client.xep_0065_sid_session""" - log.info(_("Socks5 Bytestream: TimeOut reached for id {sid} [{profile}]").format( - sid=sid, profile=client.profile)) - self._killSession(sid, client, u"TIMEOUT") + def _timeOut(self, session_hash, client): + """Called when stream was not started quickly enough - def _killSession(self, sid, client, failure_reason=None): - """Delete a current_stream id, clean up associated observers + @param session_hash(str): hash as returned by getSessionHash + @param client: %(doc_client)s + """ + log.info(u"Socks5 Bytestream: TimeOut reached") + session = self.getSession(session_hash, client.profile) + session[DEFER_KEY].errback(exceptions.TimeOutError) + + def _killSession(self, reason, session_hash, sid, client): + """Clean the current session - @param sid(unicode): session id + @param session_hash(str): hash as returned by getSessionHash + @param sid(None, unicode): session id + or None if self.xep_0065_sid_session was not used @param client: %(doc_client)s - @param failure_reason(None, unicode): if None the session is successful - else, will be used to call failure_cb + @param reason(None, failure.Failure): None if eveything was fine, a failure else + @return (None, failure.Failure): reason is returned """ - try: - session = client.xep_0065_sid_session[sid] - except KeyError: - log.warning(_("kill id called on a non existant id")) - return + log.debug(u'Cleaning session with hash {hash}{id}: {reason}'.format( + hash=session_hash, + reason='' if reason is None else reason.value, + id='' if sid is None else u' (id: {})'.format(sid), + )) try: - observer_cb = session['observer_cb'] + # XXX: we need to be sure that hash is removed from self.hash_profiles_map + # ONLY if it's the profile requesting the session killing + # otherwise, this will result in a missing hash when the 2 peers + # are on the same instance + if self.hash_profiles_map[session_hash] == client.profile: + del self.hash_profiles_map[session_hash] except KeyError: pass - else: - client.xmlstream.removeObserver(session["event_data"], observer_cb) - if session['timer'].active(): - session['timer'].cancel() - - del client.xep_0065_sid_session[sid] + if sid is not None: + try: + del client.xep_0065_sid_session[sid] + except KeyError: + log.warning(u"Session id {} is unknown".format(sid)) - # FIXME: to check try: - session_hash = session.get['hash'] - del self.hash_profiles_map[session_hash] - # FIXME: check that self.hash_profiles_map is correctly cleaned in all cases (timeout, normal flow, etc). + session_data = client._s5b_sessions[session_hash] except KeyError: - log.debug(u"Not hash found for this session") + log.warning(u"There is no session with this hash") + return + else: + del client._s5b_sessions[session_hash] + + try: + session_data['timer'].cancel() + except (internet_error.AlreadyCalled, internet_error.AlreadyCancelled): pass - success = failure_reason is None - stream_d = session[DEFER_KEY] - - if success: - stream_d.callback(None) - else: - stream_d.errback(failure.Failure(exceptions.DataError(failure_reason))) + return reason def startStream(self, file_obj, to_jid, sid, profile=C.PROF_KEY_NONE): """Launch the stream workflow @@ -1018,6 +1027,7 @@ streamhost['host'] = candidate.host streamhost['port'] = str(candidate.port) streamhost['jid'] = candidate.jid.full() + log.debug(u"Candidate proposed: {}".format(candidate)) d = iq_elt.send() args = [session_data, client] @@ -1047,6 +1057,8 @@ except StopIteration: log.warning(u"Candidate [{jid}] is unknown !".format(jid=streamhost_jid.full())) return + else: + log.info(u"Candidate choosed by target: {}".format(candidate)) if candidate.type == XEP_0065.TYPE_PROXY: log.info(u"A Socks5 proxy is used") @@ -1090,8 +1102,11 @@ session_data = self._registerHash(session_hash, file_obj, profile) else: session_hash = getSessionHash(to_jid, client.jid, sid) + session_d = defer.Deferred() + session_d.addBoth(self._killSession, session_hash, sid, client) session_data = client._s5b_sessions[session_hash] = { - DEFER_KEY: defer.Deferred(), + DEFER_KEY: session_d, + TIMER_KEY: reactor.callLater(TIMEOUT, self._timeOut, session_hash, client), } client.xep_0065_sid_session[sid] = session_data session_data.update( @@ -1140,9 +1155,12 @@ """ client = self.host.getClient(profile) assert session_hash not in client._s5b_sessions + session_d = defer.Deferred() + session_d.addBoth(self._killSession, session_hash, None, client) session_data = client._s5b_sessions[session_hash] = { "file": file_obj, - DEFER_KEY: defer.Deferred(), + DEFER_KEY: session_d, + TIMER_KEY: reactor.callLater(TIMEOUT, self._timeOut, session_hash, client), } if session_hash in self.hash_profiles_map: # The only case when 2 profiles want to register the same hash @@ -1158,6 +1176,7 @@ # There is no easy way to known if the incoming connection # to the Socks5Server is from initiator or responder, so this seams a # reasonable workaround. + # NOTE: this workaround is only used with XEP-0260 else: self.hash_profiles_map[session_hash] = profile @@ -1213,7 +1232,7 @@ if candidate is None: log.info("No streamhost candidate worked, we have to end negotiation") return client.sendError(iq_elt, 'item-not-found') - log.debug(u"activating stream") + log.info(u"We choose: {}".format(candidate)) result_elt = xmlstream.toResponse(iq_elt, 'result') query_elt = result_elt.addElement((NS_BS, 'query')) query_elt['sid'] = session_data['id']