diff src/plugins/plugin_xep_0065.py @ 1584:b57b4683dc33

plugin XEP-0065: session cleaning and timeout + log choosed candidate when using SI File Transfer
author Goffi <goffi@goffi.org>
date Fri, 13 Nov 2015 16:46:32 +0100
parents d46aae87c03a
children a34d7f621944
line wrap: on
line diff
--- a/src/plugins/plugin_xep_0065.py	Fri Nov 13 16:46:31 2015 +0100
+++ b/src/plugins/plugin_xep_0065.py	Fri Nov 13 16:46:32 2015 +0100
@@ -68,7 +68,6 @@
 from twisted.words.protocols.jabber import xmlstream
 from twisted.protocols.basic import FileSender
 from twisted.internet import defer
-from twisted.python import failure
 from collections import namedtuple
 import struct
 import hashlib
@@ -83,20 +82,6 @@
 
 from wokkel import disco, iwokkel
 
-IQ_SET = '/iq[@type="set"]'
-NS_BS = 'http://jabber.org/protocol/bytestreams'
-BS_REQUEST = IQ_SET + '/query[@xmlns="' + NS_BS + '"]'
-TIMEOUT = 60  # timeout for workflow
-DEFER_KEY = 'finished' # key of the deferred used to track session end
-SERVER_STARTING_PORT = 0 # starting number for server port search (0 to ask automatic attribution)
-
-# priorities are candidates local priorities, must be a int between 0 and 65535
-PRIORITY_BEST_DIRECT = 10000
-PRIORITY_DIRECT = 5000
-PRIORITY_ASSISTED = 1000
-PRIORITY_PROXY = 0.2 # proxy is the last option for s5b
-CANDIDATE_DELAY = 0.2 # see XEP-0260 §4
-CANDIDATE_DELAY_PROXY = 0.2 # additional time for proxy types (see XEP-0260 §4 note 3)
 
 PLUGIN_INFO = {
     "name": "XEP 0065 Plugin",
@@ -110,6 +95,23 @@
     "description": _("""Implementation of SOCKS5 Bytestreams""")
 }
 
+IQ_SET = '/iq[@type="set"]'
+NS_BS = 'http://jabber.org/protocol/bytestreams'
+BS_REQUEST = IQ_SET + '/query[@xmlns="' + NS_BS + '"]'
+TIMER_KEY = 'timer'
+DEFER_KEY = 'finished' # key of the deferred used to track session end
+SERVER_STARTING_PORT = 0 # starting number for server port search (0 to ask automatic attribution)
+
+# priorities are candidates local priorities, must be a int between 0 and 65535
+PRIORITY_BEST_DIRECT = 10000
+PRIORITY_DIRECT = 5000
+PRIORITY_ASSISTED = 1000
+PRIORITY_PROXY = 0.2 # proxy is the last option for s5b
+CANDIDATE_DELAY = 0.2 # see XEP-0260 §4
+CANDIDATE_DELAY_PROXY = 0.2 # additional time for proxy types (see XEP-0260 §4 note 3)
+
+TIMEOUT = 300 # maxium time between session creation and stream start
+
 # XXX: by default eveything is automatic
 # TODO: use these params to force use of specific proxy/port/IP
 # PARAMS = """
@@ -570,6 +572,7 @@
         except (KeyError, IndexError):
             log.error(u"Can't start file transfer, can't find protocol")
         else:
+            session[TIMER_KEY].cancel()
             protocol.startTransfer(chunk_size)
 
     def addToSession(self, session_hash, protocol):
@@ -641,6 +644,7 @@
         return self.session
 
     def startTransfer(self, dummy=None, chunk_size=None):
+        self.session[TIMER_KEY].cancel()
         self._protocol_instance.startTransfer(chunk_size)
 
     def clientConnectionFailed(self, connector, reason):
@@ -718,7 +722,6 @@
         """
 
         if self._server_factory is None:
-            # self._server_factory = Socks5ServerFactory(self.host, self.hash_profiles_map, lambda sid, client: self._killSession(sid, client))
             self._server_factory = Socks5ServerFactory(self)
             for port in xrange(SERVER_STARTING_PORT, 65356):
                 try:
@@ -861,10 +864,9 @@
         return candidate.factory.connection
 
     def connectCandidate(self, candidate, session_hash, delay=None, profile=C.PROF_KEY_NONE):
-        """"Connect to a candidate
+        """Connect to a candidate
 
         Connection will be done with a Socks5ClientFactory
-
         @param candidate(Candidate): candidate to connect to
         @param session_hash(unicode): hash of the session
             hash is the same as hostname computer in XEP-0065 § 5.3.2 #1
@@ -938,55 +940,62 @@
         d_list.addCallback(allTested)
         return d_list
 
-    def _timeOut(self, sid, client):
-        """Delecte current_stream id, called after timeout
-        @param id: id of client.xep_0065_sid_session"""
-        log.info(_("Socks5 Bytestream: TimeOut reached for id {sid} [{profile}]").format(
-            sid=sid, profile=client.profile))
-        self._killSession(sid, client, u"TIMEOUT")
+    def _timeOut(self, session_hash, client):
+        """Called when stream was not started quickly enough
 
-    def _killSession(self, sid, client, failure_reason=None):
-        """Delete a current_stream id, clean up associated observers
+        @param session_hash(str): hash as returned by getSessionHash
+        @param client: %(doc_client)s
+        """
+        log.info(u"Socks5 Bytestream: TimeOut reached")
+        session = self.getSession(session_hash, client.profile)
+        session[DEFER_KEY].errback(exceptions.TimeOutError)
+
+    def _killSession(self, reason, session_hash, sid, client):
+        """Clean the current session
 
-        @param sid(unicode): session id
+        @param session_hash(str): hash as returned by getSessionHash
+        @param sid(None, unicode): session id
+            or None if self.xep_0065_sid_session was not used
         @param client: %(doc_client)s
-        @param failure_reason(None, unicode): if None the session is successful
-            else, will be used to call failure_cb
+        @param reason(None, failure.Failure): None if eveything was fine, a failure else
+        @return (None, failure.Failure): reason is returned
         """
-        try:
-            session = client.xep_0065_sid_session[sid]
-        except KeyError:
-            log.warning(_("kill id called on a non existant id"))
-            return
+        log.debug(u'Cleaning session with hash {hash}{id}: {reason}'.format(
+            hash=session_hash,
+            reason='' if reason is None else reason.value,
+            id='' if sid is None else u' (id: {})'.format(sid),
+            ))
 
         try:
-            observer_cb = session['observer_cb']
+            # XXX: we need to be sure that hash is removed from self.hash_profiles_map
+            #      ONLY if it's the profile requesting the session killing
+            #      otherwise, this will result in a missing hash when the 2 peers
+            #      are on the same instance
+            if self.hash_profiles_map[session_hash] == client.profile:
+                del self.hash_profiles_map[session_hash]
         except KeyError:
             pass
-        else:
-            client.xmlstream.removeObserver(session["event_data"], observer_cb)
 
-        if session['timer'].active():
-            session['timer'].cancel()
-
-        del client.xep_0065_sid_session[sid]
+        if sid is not None:
+            try:
+                del client.xep_0065_sid_session[sid]
+            except KeyError:
+                log.warning(u"Session id {} is unknown".format(sid))
 
-        # FIXME: to check
         try:
-            session_hash = session.get['hash']
-            del self.hash_profiles_map[session_hash]
-            # FIXME: check that self.hash_profiles_map is correctly cleaned in all cases (timeout, normal flow, etc).
+            session_data = client._s5b_sessions[session_hash]
         except KeyError:
-            log.debug(u"Not hash found for this session")
+            log.warning(u"There is no session with this hash")
+            return
+        else:
+            del client._s5b_sessions[session_hash]
+
+        try:
+            session_data['timer'].cancel()
+        except (internet_error.AlreadyCalled, internet_error.AlreadyCancelled):
             pass
 
-        success = failure_reason is None
-        stream_d = session[DEFER_KEY]
-
-        if success:
-            stream_d.callback(None)
-        else:
-            stream_d.errback(failure.Failure(exceptions.DataError(failure_reason)))
+        return reason
 
     def startStream(self, file_obj, to_jid, sid, profile=C.PROF_KEY_NONE):
         """Launch the stream workflow
@@ -1018,6 +1027,7 @@
                 streamhost['host'] = candidate.host
                 streamhost['port'] = str(candidate.port)
                 streamhost['jid'] = candidate.jid.full()
+                log.debug(u"Candidate proposed: {}".format(candidate))
 
             d = iq_elt.send()
             args = [session_data, client]
@@ -1047,6 +1057,8 @@
         except StopIteration:
             log.warning(u"Candidate [{jid}] is unknown !".format(jid=streamhost_jid.full()))
             return
+        else:
+            log.info(u"Candidate choosed by target: {}".format(candidate))
 
         if candidate.type == XEP_0065.TYPE_PROXY:
             log.info(u"A Socks5 proxy is used")
@@ -1090,8 +1102,11 @@
             session_data = self._registerHash(session_hash, file_obj, profile)
         else:
             session_hash = getSessionHash(to_jid, client.jid, sid)
+            session_d = defer.Deferred()
+            session_d.addBoth(self._killSession, session_hash, sid, client)
             session_data = client._s5b_sessions[session_hash] = {
-                DEFER_KEY: defer.Deferred(),
+                DEFER_KEY: session_d,
+                TIMER_KEY: reactor.callLater(TIMEOUT, self._timeOut, session_hash, client),
                 }
         client.xep_0065_sid_session[sid] = session_data
         session_data.update(
@@ -1140,9 +1155,12 @@
         """
         client = self.host.getClient(profile)
         assert session_hash not in client._s5b_sessions
+        session_d = defer.Deferred()
+        session_d.addBoth(self._killSession, session_hash, None, client)
         session_data = client._s5b_sessions[session_hash] = {
             "file": file_obj,
-            DEFER_KEY: defer.Deferred(),
+            DEFER_KEY: session_d,
+            TIMER_KEY: reactor.callLater(TIMEOUT, self._timeOut, session_hash, client),
             }
         if session_hash in self.hash_profiles_map:
             # The only case when 2 profiles want to register the same hash
@@ -1158,6 +1176,7 @@
             #     There is no easy way to known if the incoming connection
             #     to the Socks5Server is from initiator or responder, so this seams a
             #     reasonable workaround.
+            #     NOTE: this workaround is only used with XEP-0260
         else:
             self.hash_profiles_map[session_hash] = profile
 
@@ -1213,7 +1232,7 @@
         if candidate is None:
             log.info("No streamhost candidate worked, we have to end negotiation")
             return client.sendError(iq_elt, 'item-not-found')
-        log.debug(u"activating stream")
+        log.info(u"We choose: {}".format(candidate))
         result_elt = xmlstream.toResponse(iq_elt, 'result')
         query_elt = result_elt.addElement((NS_BS, 'query'))
         query_elt['sid'] = session_data['id']