changeset 1570:37d4be4a9fed

plugins XEP-0260, XEP-0065: proxy handling: - XEP-0065: Candidate.activate launch proxy activation - XEP-0065: a candidate is individually connected with connectCandidate - transport-info action handling can now manage candidate and proxy infos
author Goffi <goffi@goffi.org>
date Sun, 08 Nov 2015 14:44:33 +0100
parents 44854fb5d3b2
children c668081eba1c
files src/plugins/plugin_xep_0065.py src/plugins/plugin_xep_0260.py
diffstat 2 files changed, 165 insertions(+), 62 deletions(-) [+]
line wrap: on
line diff
--- a/src/plugins/plugin_xep_0065.py	Sun Nov 08 14:44:33 2015 +0100
+++ b/src/plugins/plugin_xep_0065.py	Sun Nov 08 14:44:33 2015 +0100
@@ -257,6 +257,23 @@
             raise exceptions.InternalError(u"Unknown {} type !".format(self.type))
         return 2**16 * multiplier + self._local_priority
 
+    def activate(self, sid, peer_jid, client):
+        """Activate the proxy candidate
+
+        Send activation request as explained in XEP-0065 § 6.3.5
+        Must only be used with proxy candidates
+        @param sid(unicode): session id (same as for getSessionHash)
+        @param peer_jid(jid.JID): jid of the other peer
+        @return (D(domish.Element)): IQ result (or error)
+        """
+        assert self.type == XEP_0065.TYPE_PROXY
+        iq_elt = client.IQ()
+        iq_elt['to'] = self.jid.full()
+        query_elt = iq_elt.addElement((NS_BS, 'query'))
+        query_elt['sid'] = sid
+        query_elt.addElement('activate', content=peer_jid.full())
+        return iq_elt.send()
+
     def startTransfer(self, session_hash=None):
         self.factory.startTransfer(session_hash)
 
@@ -280,7 +297,6 @@
         @param session_hash(str): hash of the session
             must only be used in client mode
         """
-        log.debug(_("Protocol init"))
         self.connection = defer.Deferred() # called when connection/auth is done
         if session_hash is not None:
             self.server_mode = False
@@ -313,7 +329,6 @@
         self.transport.write(struct.pack('!3B', SOCKS5_VER, 1, AUTHMECH_ANON))
 
     def _parseNegotiation(self):
-        log.debug("_parseNegotiation")
         try:
             # Parse out data
             ver, nmethod = struct.unpack('!BB', self.buf[:2])
@@ -348,7 +363,6 @@
             pass
 
     def _parseUserPass(self):
-        log.debug("_parseUserPass")
         try:
             # Parse out data
             ver, ulen = struct.unpack('BB', self.buf[:2])
@@ -370,14 +384,12 @@
             pass
 
     def sendErrorReply(self, errorcode):
-        log.debug("sendErrorReply")
         # Any other address types are not supported
         result = struct.pack('!BBBBIH', SOCKS5_VER, errorcode, 0, 1, 0, 0)
         self.transport.write(result)
         self.transport.loseConnection()
 
     def _parseRequest(self):
-        log.debug("_parseRequest")
         try:
             # Parse out data and trim buffer accordingly
             ver, cmd, rsvd, self.addressType = struct.unpack('!BBBB', self.buf[:4])
@@ -420,7 +432,6 @@
             return None
 
     def _makeRequest(self):
-        log.debug("_makeRequest")
         # sha1 = getSessionHash(self.data["from"], self.data["to"], self.sid)
         hash_ = self._session_hash
         request = struct.pack('!5B%dsH' % len(hash_), SOCKS5_VER, CMD_CONNECT, 0, ADDR_DOMAINNAME, len(hash_), hash_, 0)
@@ -428,7 +439,6 @@
         self.state = STATE_CLIENT_REQUEST
 
     def _parseRequestReply(self):
-        log.debug("_parseRequestReply")
         try:
             ver, rep, rsvd, self.addressType = struct.unpack('!BBBB', self.buf[:4])
             # Ensure we actually support the requested address type
@@ -472,8 +482,6 @@
             self._startNegotiation()
 
     def connectRequested(self, addr, port):
-        log.debug("connectRequested")
-
         # Check that this session is expected
         if not self.factory.addToSession(addr, self):
             self.sendErrorReply(REPLY_CONN_REFUSED)
@@ -497,7 +505,6 @@
         self.transport.loseConnection()
 
     def connectCompleted(self, remotehost, remoteport):
-        log.debug("connectCompleted")
         if self.addressType == ADDR_IPV4:
             result = struct.pack('!BBBBIH', SOCKS5_VER, REPLY_SUCCESS, 0, 1, remotehost, remoteport)
         elif self.addressType == ADDR_DOMAINNAME:
@@ -871,18 +878,36 @@
         candidate.factory.connector = connector
         return candidate.factory.connection
 
+    def connectCandidate(self, candidate, session_hash, delay=None, profile=C.PROF_KEY_NONE):
+        """"Connect to a candidate
+
+        Connection will be done with a Socks5ClientFactory
+
+        @param candidate(Candidate): candidate to connect to
+        @param session_hash(unicode): hash of the session
+            hash is the same as hostname computer in XEP-0065 § 5.3.2 #1
+        @param delay(None, float): optional delay to wait before connection, in seconds
+        @param profile: %(doc_profile)s
+        @return (D): Deferred launched when TCP connection + Socks5 connection is done
+        """
+        factory = Socks5ClientFactory(self, session_hash, profile)
+        candidate.factory = factory
+        if delay is None:
+            d = defer.succeed(candidate.host)
+        else:
+            d = sat_defer.DelayedDeferred(delay, candidate.host)
+        d.addCallback(reactor.connectTCP, candidate.port, factory)
+        d.addCallback(self._addConnector, candidate)
+        return d
+
     def tryCandidates(self, candidates, session_hash, connection_cb=None, connection_eb=None, profile=C.PROF_KEY_NONE):
         defers_list = []
 
         for candidate in candidates:
-            factory = Socks5ClientFactory(self, session_hash, profile)
-            candidate.factory = factory
             delay = CANDIDATE_DELAY * len(defers_list)
             if candidate.type == XEP_0065.TYPE_PROXY:
                 delay += CANDIDATE_DELAY_PROXY
-            d = sat_defer.DelayedDeferred(delay, candidate.host)
-            d.addCallback(reactor.connectTCP, candidate.port, factory)
-            d.addCallback(self._addConnector, candidate)
+            d = self.connectCandidate(candidate, session_hash, delay, profile)
             if connection_cb is not None:
                 d.addCallback(lambda dummy, candidate=candidate, profile=profile: connection_cb(candidate, profile))
             if connection_eb is not None:
--- a/src/plugins/plugin_xep_0260.py	Sun Nov 08 14:44:33 2015 +0100
+++ b/src/plugins/plugin_xep_0260.py	Sun Nov 08 14:44:33 2015 +0100
@@ -48,6 +48,10 @@
 }
 
 
+class ProxyError(Exception):
+    pass
+
+
 class XEP_0260(object):
     # TODO: udp handling
 
@@ -127,6 +131,29 @@
 
         defer.returnValue(transport_elt)
 
+    def _proxyActivatedCb(self, iq_result_elt, candidate, session, content_name, profile):
+        """Called when activation confirmation has been received from proxy
+
+        cf XEP-0260 § 2.4
+        """
+        # now that the proxy is activated, we have to inform other peer
+        iq_elt, transport_elt = self._j.buildAction(self._j.A_TRANSPORT_INFO, session, content_name, profile)
+        activated_elt = transport_elt.addElement('activated')
+        activated_elt['cid'] = candidate.id
+        iq_elt.send
+
+    def _proxyActivatedEb(self, stanza_error, candidate, session, content_name, profile):
+        """Called when activation error has been received from proxy
+
+        cf XEP-0260 § 2.4
+        """
+        # TODO: fallback to IBB
+        # now that the proxy is activated, we have to inform other peer
+        iq_elt, transport_elt = self._j.buildAction(self._j.A_TRANSPORT_INFO, session, content_name, profile)
+        transport_elt.addElement('proxy-error')
+        iq_elt.send
+        return stanza_error
+
     def _foundPeerCandidate(self, candidate, session, transport_data, content_name, client):
         """Called when the best candidate from other peer is found
 
@@ -134,7 +161,7 @@
             or None if no candidate is accessible
         @param session(dict):  session data
         @param transport_data(dict): transport data
-        @param content_name(dict): name of the current content
+        @param content_name(unicode): name of the current content
         @param client(unicode): %(doc_client)s
         """
 
@@ -154,18 +181,18 @@
             candidate_elt = transport_elt.addElement('candidate-used')
             candidate_elt['cid'] = candidate.id
         iq_elt.send() # TODO: check result stanza
-        content_data = session['contents'][content_name]
-        self._checkCandidates(session, content_data, transport_data, client)
+        self._checkCandidates(session, content_name, transport_data, client)
 
-    def _checkCandidates(self, session, content_data, transport_data, client):
+    def _checkCandidates(self, session, content_name, transport_data, client):
         """Called when a candidate has been choosed
 
         if we have both candidates, we select one, or fallback to an other transport
         @param session(dict):  session data
-        @param content_data(dict): content data
+        @param content_name(unicode): name of the current content
         @param transport_data(dict): transport data
         @param client(unicode): %(doc_client)s
         """
+        content_data = session['contents'][content_name]
         try:
             best_candidate = transport_data['best_candidate']
         except KeyError:
@@ -194,20 +221,90 @@
         if choosed_candidate is None:
             log.warning(u"Socks5 negociation failed, we need to fallback to IBB")
         else:
-            if choosed_candidate==best_candidate:
-                who = u'our'
-            else:
-                who = u'other peer'
-                best_candidate.discard()
+            # best_peer_candidate was choosed from the candidates we have sent
+            # so our_candidate is true if choosed_candidate is peer_best_candidate
+            our_candidate = choosed_candidate==peer_best_candidate
 
             log.info(u"Socks5 negociation successful, {who} candidate will be used: {candidate}".format(
-                who = who,
+                who = u'our' if our_candidate else u'other peer',
                 candidate = choosed_candidate))
             del transport_data['best_candidate']
             del transport_data['peer_best_candidate']
+
+            if choosed_candidate.type == self._s5b.TYPE_PROXY:
+                # the file transfer need to wait for proxy activation
+                # (see XEP-0260 § 2.4)
+                if our_candidate:
+                    d = self._s5b.connectCandidate(choosed_candidate, transport_data['session_hash'], profile=client.profile)
+                    d.addCallback(lambda dummy: choosed_candidate.activate(transport_data['sid'], session['peer_jid'], client))
+                    args = [choosed_candidate, session, content_name, client.profile]
+                    d.addCallbacks(self._proxyActivatedCb, self._proxyActivatedEb, args, None, args)
+                else:
+                    # this Deferred will be called when we'll receive activation confirmation from other peer
+                    d = transport_data['activation_d'] = defer.Deferred()
+            else:
+                d = defer.succeed(None)
+
             if content_data['senders'] == session['role']:
-                # we can now start the file transfer
-                choosed_candidate.startTransfer(transport_data['session_hash'])
+                # we can now start the file transfer (or start it after proxy activation)
+                d.addCallback(lambda dummy: choosed_candidate.startTransfer(transport_data['session_hash']))
+
+    def _candidateInfo(self, candidate_elt, session, content_name, transport_data, client):
+        """Called when best candidate has been received from peer (or if none is working)
+
+        @param candidate_elt(domish.Element): candidate-used or candidate-error element
+            (see XEP-0260 §2.3)
+        @param session(dict):  session data
+        @param content_name(unicode): name of the current content
+        @param transport_data(dict): transport data
+        @param client(unicode): %(doc_client)s
+        """
+        if candidate_elt.name == 'candidate-error':
+            # candidate-error, no candidate worked
+            transport_data['peer_best_candidate'] = None
+        else:
+            # candidate-used, one candidate was choosed
+            try:
+                cid = candidate_elt.attributes['cid']
+            except KeyError:
+                log.warning(u"No cid found in <candidate-used>")
+                raise exceptions.DataError
+            try:
+                candidate = (c for c in transport_data['candidates'] if c.id == cid).next()
+            except StopIteration:
+                log.warning(u"Given cid doesn't correspond to any known candidate !")
+                raise exceptions.DataError # TODO: send an error to other peer, and use better exception
+            except KeyError:
+                # a transport-info can also be intentionaly sent too early by other peer
+                # but there is little probability
+                log.error(u'"candidates" key doesn\'t exists in transport_data, it should at this point')
+                raise exceptions.InternalError
+            # at this point we have the candidate choosed by other peer
+            transport_data['peer_best_candidate'] = candidate
+            log.info(u"Other peer best candidate: {}".format(candidate))
+
+        del transport_data['candidates']
+        self._checkCandidates(session, content_name, transport_data, client)
+
+    def _proxyActivationInfo(self, proxy_elt, session, content_name, transport_data, client):
+        """Called when proxy has been activated (or has sent an error)
+
+        @param proxy_elt(domish.Element): <activated/> or <proxy-error/> element
+            (see XEP-0260 §2.4)
+        @param session(dict):  session data
+        @param content_name(unicode): name of the current content
+        @param transport_data(dict): transport data
+        @param client(unicode): %(doc_client)s
+        """
+        try:
+            activation_d = transport_data.pop('activation_d')
+        except KeyError:
+            log.warning(u"Received unexpected transport-info for proxy activation")
+
+        if proxy_elt.name == 'activated':
+            activation_d.callback(None)
+        else:
+            activation_d.errback(ProxyError)
 
     @defer.inlineCallbacks
     def jingleHandler(self, action, session, content_name, transport_elt, profile):
@@ -256,41 +353,22 @@
             transport_elt = self._buildCandidates(session, candidates, sid, session_hash, client)
 
         elif action == self._j.A_TRANSPORT_INFO:
-            # other peer gave us its choosed candidate
-            try:
-                candidate_elt = transport_elt.elements(NS_JINGLE_S5B, 'candidate-used').next()
-            except StopIteration:
-                try:
-                    candidate_elt = transport_elt.elements(NS_JINGLE_S5B, 'candidate-error').next()
-                except StopIteration:
-                    log.warning(u"Unexpected transport element: {}".format(transport_elt.toXml()))
-                    raise exceptions.DataError
-                else:
-                    # candidate-error, no candidate worked
-                    transport_data['peer_best_candidate'] = None
-            else:
-                # candidate-used, one candidate was choosed
-                try:
-                    cid = candidate_elt.attributes['cid']
-                except KeyError:
-                    log.warning(u"No cid found in <candidate-used>")
-                    raise exceptions.DataError
-                try:
-                    candidate = (c for c in transport_data['candidates'] if c.id == cid).next()
-                except StopIteration:
-                    log.warning(u"Given cid doesn't correspond to any known candidate !")
-                    raise exceptions.DataError # TODO: send an error to other peer, and use better exception
-                except KeyError:
-                    # a transport-info can also be intentionaly sent too early by other peer
-                    # but there is little probability
-                    log.error(u'"candidates" key doesn\'t exists in transport_data, it should at this point')
-                    raise exceptions.InternalError
-                # at this point we have the candidate choosed by other peer
-                transport_data['peer_best_candidate'] = candidate
-                log.info(u"Other peer best candidate: {}".format(candidate))
+            # transport-info can be about candidate or proxy activation
+            candidate_elt = None
 
-            del transport_data['candidates']
-            self._checkCandidates(session, content_data, transport_data, client)
+            for method, names in ((self._candidateInfo, ('candidate-used', 'candidate-error')),
+                                  (self._proxyActivationInfo, ('activated', 'proxy-error'))):
+                for name in names:
+                    try:
+                        candidate_elt = transport_elt.elements(NS_JINGLE_S5B, name).next()
+                    except StopIteration:
+                        continue
+                    else:
+                        method(candidate_elt, session, content_name, transport_data, client)
+                        break
+
+            if candidate_elt is None:
+                log.warning(u"Unexpected transport element: {}".format(transport_elt.toXml()))
 
         else:
             log.warning(u"FIXME: unmanaged action {}".format(action))