# HG changeset patch # User Goffi # Date 1446990273 -3600 # Node ID 37d4be4a9fed3a443881fc10c6c133426ecd539a # Parent 44854fb5d3b29c2d3e16f0896f7c7fed39f6f172 plugins XEP-0260, XEP-0065: proxy handling: - XEP-0065: Candidate.activate launch proxy activation - XEP-0065: a candidate is individually connected with connectCandidate - transport-info action handling can now manage candidate and proxy infos diff -r 44854fb5d3b2 -r 37d4be4a9fed src/plugins/plugin_xep_0065.py --- a/src/plugins/plugin_xep_0065.py Sun Nov 08 14:44:33 2015 +0100 +++ b/src/plugins/plugin_xep_0065.py Sun Nov 08 14:44:33 2015 +0100 @@ -257,6 +257,23 @@ raise exceptions.InternalError(u"Unknown {} type !".format(self.type)) return 2**16 * multiplier + self._local_priority + def activate(self, sid, peer_jid, client): + """Activate the proxy candidate + + Send activation request as explained in XEP-0065 § 6.3.5 + Must only be used with proxy candidates + @param sid(unicode): session id (same as for getSessionHash) + @param peer_jid(jid.JID): jid of the other peer + @return (D(domish.Element)): IQ result (or error) + """ + assert self.type == XEP_0065.TYPE_PROXY + iq_elt = client.IQ() + iq_elt['to'] = self.jid.full() + query_elt = iq_elt.addElement((NS_BS, 'query')) + query_elt['sid'] = sid + query_elt.addElement('activate', content=peer_jid.full()) + return iq_elt.send() + def startTransfer(self, session_hash=None): self.factory.startTransfer(session_hash) @@ -280,7 +297,6 @@ @param session_hash(str): hash of the session must only be used in client mode """ - log.debug(_("Protocol init")) self.connection = defer.Deferred() # called when connection/auth is done if session_hash is not None: self.server_mode = False @@ -313,7 +329,6 @@ self.transport.write(struct.pack('!3B', SOCKS5_VER, 1, AUTHMECH_ANON)) def _parseNegotiation(self): - log.debug("_parseNegotiation") try: # Parse out data ver, nmethod = struct.unpack('!BB', self.buf[:2]) @@ -348,7 +363,6 @@ pass def _parseUserPass(self): - log.debug("_parseUserPass") try: # Parse out data ver, ulen = struct.unpack('BB', self.buf[:2]) @@ -370,14 +384,12 @@ pass def sendErrorReply(self, errorcode): - log.debug("sendErrorReply") # Any other address types are not supported result = struct.pack('!BBBBIH', SOCKS5_VER, errorcode, 0, 1, 0, 0) self.transport.write(result) self.transport.loseConnection() def _parseRequest(self): - log.debug("_parseRequest") try: # Parse out data and trim buffer accordingly ver, cmd, rsvd, self.addressType = struct.unpack('!BBBB', self.buf[:4]) @@ -420,7 +432,6 @@ return None def _makeRequest(self): - log.debug("_makeRequest") # sha1 = getSessionHash(self.data["from"], self.data["to"], self.sid) hash_ = self._session_hash request = struct.pack('!5B%dsH' % len(hash_), SOCKS5_VER, CMD_CONNECT, 0, ADDR_DOMAINNAME, len(hash_), hash_, 0) @@ -428,7 +439,6 @@ self.state = STATE_CLIENT_REQUEST def _parseRequestReply(self): - log.debug("_parseRequestReply") try: ver, rep, rsvd, self.addressType = struct.unpack('!BBBB', self.buf[:4]) # Ensure we actually support the requested address type @@ -472,8 +482,6 @@ self._startNegotiation() def connectRequested(self, addr, port): - log.debug("connectRequested") - # Check that this session is expected if not self.factory.addToSession(addr, self): self.sendErrorReply(REPLY_CONN_REFUSED) @@ -497,7 +505,6 @@ self.transport.loseConnection() def connectCompleted(self, remotehost, remoteport): - log.debug("connectCompleted") if self.addressType == ADDR_IPV4: result = struct.pack('!BBBBIH', SOCKS5_VER, REPLY_SUCCESS, 0, 1, remotehost, remoteport) elif self.addressType == ADDR_DOMAINNAME: @@ -871,18 +878,36 @@ candidate.factory.connector = connector return candidate.factory.connection + def connectCandidate(self, candidate, session_hash, delay=None, profile=C.PROF_KEY_NONE): + """"Connect to a candidate + + Connection will be done with a Socks5ClientFactory + + @param candidate(Candidate): candidate to connect to + @param session_hash(unicode): hash of the session + hash is the same as hostname computer in XEP-0065 § 5.3.2 #1 + @param delay(None, float): optional delay to wait before connection, in seconds + @param profile: %(doc_profile)s + @return (D): Deferred launched when TCP connection + Socks5 connection is done + """ + factory = Socks5ClientFactory(self, session_hash, profile) + candidate.factory = factory + if delay is None: + d = defer.succeed(candidate.host) + else: + d = sat_defer.DelayedDeferred(delay, candidate.host) + d.addCallback(reactor.connectTCP, candidate.port, factory) + d.addCallback(self._addConnector, candidate) + return d + def tryCandidates(self, candidates, session_hash, connection_cb=None, connection_eb=None, profile=C.PROF_KEY_NONE): defers_list = [] for candidate in candidates: - factory = Socks5ClientFactory(self, session_hash, profile) - candidate.factory = factory delay = CANDIDATE_DELAY * len(defers_list) if candidate.type == XEP_0065.TYPE_PROXY: delay += CANDIDATE_DELAY_PROXY - d = sat_defer.DelayedDeferred(delay, candidate.host) - d.addCallback(reactor.connectTCP, candidate.port, factory) - d.addCallback(self._addConnector, candidate) + d = self.connectCandidate(candidate, session_hash, delay, profile) if connection_cb is not None: d.addCallback(lambda dummy, candidate=candidate, profile=profile: connection_cb(candidate, profile)) if connection_eb is not None: diff -r 44854fb5d3b2 -r 37d4be4a9fed src/plugins/plugin_xep_0260.py --- a/src/plugins/plugin_xep_0260.py Sun Nov 08 14:44:33 2015 +0100 +++ b/src/plugins/plugin_xep_0260.py Sun Nov 08 14:44:33 2015 +0100 @@ -48,6 +48,10 @@ } +class ProxyError(Exception): + pass + + class XEP_0260(object): # TODO: udp handling @@ -127,6 +131,29 @@ defer.returnValue(transport_elt) + def _proxyActivatedCb(self, iq_result_elt, candidate, session, content_name, profile): + """Called when activation confirmation has been received from proxy + + cf XEP-0260 § 2.4 + """ + # now that the proxy is activated, we have to inform other peer + iq_elt, transport_elt = self._j.buildAction(self._j.A_TRANSPORT_INFO, session, content_name, profile) + activated_elt = transport_elt.addElement('activated') + activated_elt['cid'] = candidate.id + iq_elt.send + + def _proxyActivatedEb(self, stanza_error, candidate, session, content_name, profile): + """Called when activation error has been received from proxy + + cf XEP-0260 § 2.4 + """ + # TODO: fallback to IBB + # now that the proxy is activated, we have to inform other peer + iq_elt, transport_elt = self._j.buildAction(self._j.A_TRANSPORT_INFO, session, content_name, profile) + transport_elt.addElement('proxy-error') + iq_elt.send + return stanza_error + def _foundPeerCandidate(self, candidate, session, transport_data, content_name, client): """Called when the best candidate from other peer is found @@ -134,7 +161,7 @@ or None if no candidate is accessible @param session(dict): session data @param transport_data(dict): transport data - @param content_name(dict): name of the current content + @param content_name(unicode): name of the current content @param client(unicode): %(doc_client)s """ @@ -154,18 +181,18 @@ candidate_elt = transport_elt.addElement('candidate-used') candidate_elt['cid'] = candidate.id iq_elt.send() # TODO: check result stanza - content_data = session['contents'][content_name] - self._checkCandidates(session, content_data, transport_data, client) + self._checkCandidates(session, content_name, transport_data, client) - def _checkCandidates(self, session, content_data, transport_data, client): + def _checkCandidates(self, session, content_name, transport_data, client): """Called when a candidate has been choosed if we have both candidates, we select one, or fallback to an other transport @param session(dict): session data - @param content_data(dict): content data + @param content_name(unicode): name of the current content @param transport_data(dict): transport data @param client(unicode): %(doc_client)s """ + content_data = session['contents'][content_name] try: best_candidate = transport_data['best_candidate'] except KeyError: @@ -194,20 +221,90 @@ if choosed_candidate is None: log.warning(u"Socks5 negociation failed, we need to fallback to IBB") else: - if choosed_candidate==best_candidate: - who = u'our' - else: - who = u'other peer' - best_candidate.discard() + # best_peer_candidate was choosed from the candidates we have sent + # so our_candidate is true if choosed_candidate is peer_best_candidate + our_candidate = choosed_candidate==peer_best_candidate log.info(u"Socks5 negociation successful, {who} candidate will be used: {candidate}".format( - who = who, + who = u'our' if our_candidate else u'other peer', candidate = choosed_candidate)) del transport_data['best_candidate'] del transport_data['peer_best_candidate'] + + if choosed_candidate.type == self._s5b.TYPE_PROXY: + # the file transfer need to wait for proxy activation + # (see XEP-0260 § 2.4) + if our_candidate: + d = self._s5b.connectCandidate(choosed_candidate, transport_data['session_hash'], profile=client.profile) + d.addCallback(lambda dummy: choosed_candidate.activate(transport_data['sid'], session['peer_jid'], client)) + args = [choosed_candidate, session, content_name, client.profile] + d.addCallbacks(self._proxyActivatedCb, self._proxyActivatedEb, args, None, args) + else: + # this Deferred will be called when we'll receive activation confirmation from other peer + d = transport_data['activation_d'] = defer.Deferred() + else: + d = defer.succeed(None) + if content_data['senders'] == session['role']: - # we can now start the file transfer - choosed_candidate.startTransfer(transport_data['session_hash']) + # we can now start the file transfer (or start it after proxy activation) + d.addCallback(lambda dummy: choosed_candidate.startTransfer(transport_data['session_hash'])) + + def _candidateInfo(self, candidate_elt, session, content_name, transport_data, client): + """Called when best candidate has been received from peer (or if none is working) + + @param candidate_elt(domish.Element): candidate-used or candidate-error element + (see XEP-0260 §2.3) + @param session(dict): session data + @param content_name(unicode): name of the current content + @param transport_data(dict): transport data + @param client(unicode): %(doc_client)s + """ + if candidate_elt.name == 'candidate-error': + # candidate-error, no candidate worked + transport_data['peer_best_candidate'] = None + else: + # candidate-used, one candidate was choosed + try: + cid = candidate_elt.attributes['cid'] + except KeyError: + log.warning(u"No cid found in ") + raise exceptions.DataError + try: + candidate = (c for c in transport_data['candidates'] if c.id == cid).next() + except StopIteration: + log.warning(u"Given cid doesn't correspond to any known candidate !") + raise exceptions.DataError # TODO: send an error to other peer, and use better exception + except KeyError: + # a transport-info can also be intentionaly sent too early by other peer + # but there is little probability + log.error(u'"candidates" key doesn\'t exists in transport_data, it should at this point') + raise exceptions.InternalError + # at this point we have the candidate choosed by other peer + transport_data['peer_best_candidate'] = candidate + log.info(u"Other peer best candidate: {}".format(candidate)) + + del transport_data['candidates'] + self._checkCandidates(session, content_name, transport_data, client) + + def _proxyActivationInfo(self, proxy_elt, session, content_name, transport_data, client): + """Called when proxy has been activated (or has sent an error) + + @param proxy_elt(domish.Element): or element + (see XEP-0260 §2.4) + @param session(dict): session data + @param content_name(unicode): name of the current content + @param transport_data(dict): transport data + @param client(unicode): %(doc_client)s + """ + try: + activation_d = transport_data.pop('activation_d') + except KeyError: + log.warning(u"Received unexpected transport-info for proxy activation") + + if proxy_elt.name == 'activated': + activation_d.callback(None) + else: + activation_d.errback(ProxyError) @defer.inlineCallbacks def jingleHandler(self, action, session, content_name, transport_elt, profile): @@ -256,41 +353,22 @@ transport_elt = self._buildCandidates(session, candidates, sid, session_hash, client) elif action == self._j.A_TRANSPORT_INFO: - # other peer gave us its choosed candidate - try: - candidate_elt = transport_elt.elements(NS_JINGLE_S5B, 'candidate-used').next() - except StopIteration: - try: - candidate_elt = transport_elt.elements(NS_JINGLE_S5B, 'candidate-error').next() - except StopIteration: - log.warning(u"Unexpected transport element: {}".format(transport_elt.toXml())) - raise exceptions.DataError - else: - # candidate-error, no candidate worked - transport_data['peer_best_candidate'] = None - else: - # candidate-used, one candidate was choosed - try: - cid = candidate_elt.attributes['cid'] - except KeyError: - log.warning(u"No cid found in ") - raise exceptions.DataError - try: - candidate = (c for c in transport_data['candidates'] if c.id == cid).next() - except StopIteration: - log.warning(u"Given cid doesn't correspond to any known candidate !") - raise exceptions.DataError # TODO: send an error to other peer, and use better exception - except KeyError: - # a transport-info can also be intentionaly sent too early by other peer - # but there is little probability - log.error(u'"candidates" key doesn\'t exists in transport_data, it should at this point') - raise exceptions.InternalError - # at this point we have the candidate choosed by other peer - transport_data['peer_best_candidate'] = candidate - log.info(u"Other peer best candidate: {}".format(candidate)) + # transport-info can be about candidate or proxy activation + candidate_elt = None - del transport_data['candidates'] - self._checkCandidates(session, content_data, transport_data, client) + for method, names in ((self._candidateInfo, ('candidate-used', 'candidate-error')), + (self._proxyActivationInfo, ('activated', 'proxy-error'))): + for name in names: + try: + candidate_elt = transport_elt.elements(NS_JINGLE_S5B, name).next() + except StopIteration: + continue + else: + method(candidate_elt, session, content_name, transport_data, client) + break + + if candidate_elt is None: + log.warning(u"Unexpected transport element: {}".format(transport_elt.toXml())) else: log.warning(u"FIXME: unmanaged action {}".format(action))