# HG changeset patch # User Goffi # Date 1446498161 -3600 # Node ID dcce6381073373b24446d63cf0b3c9881d0f0795 # Parent 7cc29634b6efd29b1060795c65f56036a7f7b688 plugin XEP-0260: first draft diff -r 7cc29634b6ef -r dcce63810733 src/plugins/plugin_xep_0260.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/plugins/plugin_xep_0260.py Mon Nov 02 22:02:41 2015 +0100 @@ -0,0 +1,315 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- + +# SAT plugin for Jingle (XEP-0260) +# Copyright (C) 2009, 2010, 2011, 2012, 2013, 2014, 2015 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +from sat.core.i18n import _ +from sat.core.log import getLogger +log = getLogger(__name__) +from sat.core import exceptions +from wokkel import disco, iwokkel +from zope.interface import implements +from twisted.words.xish import domish +from twisted.words.protocols.jabber import jid +from twisted.internet import defer +import uuid + +try: + from twisted.words.protocols.xmlstream import XMPPHandler +except ImportError: + from wokkel.subprotocols import XMPPHandler + + +NS_JINGLE_S5B = 'urn:xmpp:jingle:transports:s5b:1' + +PLUGIN_INFO = { + "name": "Jingle SOCKS5 Bytestreams", + "import_name": "XEP-0260", + "type": "XEP", + "protocols": ["XEP-0260"], + "dependencies": ["XEP-0166", "XEP-0065"], + "main": "XEP_0260", + "handler": "yes", + "description": _("""Implementation of Jingle SOCKS5 Bytestreams""") +} + + +class XEP_0260(object): + # TODO: udp handling + + def __init__(self, host): + log.info(_("plugin Jingle SOCKS5 Bytestreams")) + self.host = host + self._j = host.plugins["XEP-0166"] # shortcut to access jingle + self._s5b = host.plugins["XEP-0065"] # and socks5 bytestream + self._j.registerTransport(NS_JINGLE_S5B, self._j.TRANSPORT_STREAMING, self, 100) + + def getHandler(self, profile): + return XEP_0260_handler() + + def _parseCandidates(self, transport_elt): + """Parse elements + + @param transport_elt(domish.Element): parent element + @return (list[plugin_xep_0065.Candidate): list of parsed candidates + """ + candidates = [] + for candidate_elt in transport_elt.elements(NS_JINGLE_S5B, 'candidate'): + try: + cid = candidate_elt['cid'] + host = candidate_elt['host'] + jid_= jid.JID(candidate_elt['jid']) + port = int(candidate_elt.getAttribute('port', 1080)) + priority = int(candidate_elt['priority']) + type_ = candidate_elt.getAttribute('type', self._s5b.TYPE_DIRECT) + except (KeyError, ValueError): + raise exceptions.DataError() + candidate = self._s5b.Candidate(host, port, type_, priority, jid_, cid) + candidates.append(candidate) + # self._s5b.registerCandidate(candidate) + return candidates + + def _buildCandidates(self, session, candidates, sid, session_hash, client, mode=None): + """Build element with candidates + + @param session(dict): jingle session data + @param candidates(iterator[plugin_xep_0065.Candidate]): iterator of candidates to add + @param sid(unicode): transport stream id + @param client: %(doc_client)s + @param mode(str, None): 'tcp' or 'udp', or None to have no attribute + @return (domish.Element): parent element where elements must be added + """ + proxy = next((candidate for candidate in candidates if candidate.type == self._s5b.TYPE_PROXY), None) + transport_elt = domish.Element((NS_JINGLE_S5B, "transport")) + transport_elt['sid'] = sid + if proxy is not None: + transport_elt['dstaddr'] = session_hash + if mode is not None: + transport_elt['mode'] = 'tcp' # XXX: we only manage tcp for now + + for candidate in candidates: + log.debug(u"Adding candidate: {}".format(candidate)) + candidate_elt = transport_elt.addElement('candidate', NS_JINGLE_S5B) + if candidate.id is None: + candidate.id = unicode(uuid.uuid4()) + candidate_elt['cid'] = candidate.id + candidate_elt['host'] = candidate.host + candidate_elt['jid'] = candidate.jid.full() + candidate_elt['port'] = unicode(candidate.port) + candidate_elt['priority'] = unicode(candidate.priority) + candidate_elt['type'] = candidate.type + return transport_elt + + @defer.inlineCallbacks + def jingleSessionInit(self, session, content_name, profile): + client = self.host.getClient(profile) + content_data = session['contents'][content_name] + transport_data = content_data['transport_data'] + sid = transport_data['sid'] = unicode(uuid.uuid4()) + session_hash = transport_data['session_hash'] = self._s5b.getSessionHash(client.jid, session['to_jid'], sid) + candidates = transport_data['candidates'] = yield self._s5b.getCandidates(profile) + mode = 'tcp' # XXX: we only manage tcp for now + transport_elt = self._buildCandidates(session, candidates, sid, session_hash, client, mode) + + defer.returnValue(transport_elt) + + def _foundPeerCandidate(self, candidate, session, transport_data, content_name, client): + """Called when the best candidate from other peer is found + + @param candidate(XEP_0065.Candidate, None): selected candidate, + or None if no candidate is accessible + @param session(dict): session data + @param transport_data(dict): transport data + @param content_name(dict): name of the current content + @param client(unicode): %(doc_client)s + """ + + transport_data['best_candidate'] = candidate + # we need to disconnect all non selected candidates before removing them + for c in transport_data['peer_candidates']: + if c is None or c is candidate: + continue + c.discard() + del transport_data['peer_candidates'] + iq_elt, transport_elt = self._j.buildAction(self._j.A_TRANSPORT_INFO, session, content_name, client.profile) + if candidate is None: + log.warning(u"Can't connect to any peer candidate") + candidate_elt = transport_elt.addElement('candidate-error') + else: + log.info(u"Found best peer candidate: {}".format(unicode(candidate))) + candidate_elt = transport_elt.addElement('candidate-used') + candidate_elt['cid'] = candidate.id + iq_elt.send() # TODO: check result stanza + content_data = session['contents'][content_name] + self._checkCandidates(session, content_data, transport_data, client) + + def _checkCandidates(self, session, content_data, transport_data, client): + """Called when a candidate has been choosed + + if we have both candidates, we select one, or fallback to an other transport + @param session(dict): session data + @param content_data(dict): content data + @param transport_data(dict): transport data + @param client(unicode): %(doc_client)s + """ + try: + best_candidate = transport_data['best_candidate'] + except KeyError: + # we have not our best candidate yet + return + try: + peer_best_candidate = transport_data['peer_best_candidate'] + except KeyError: + # we have not peer best candidate yet + return + + # at this point we have both candidates, it's time to choose one + if best_candidate is None or peer_best_candidate is None: + choosed_candidate = best_candidate or peer_best_candidate + else: + if best_candidate.priority == peer_best_candidate.priority: + # same priority, we choose initiator one according to XEP-0260 §2.4 #4 + log.debug(u"Candidates have same priority, we choose the initiator one") + if session['initiator'] == client.jid: + choosed_candidate = best_candidate + else: + choosed_candidate = peer_best_candidate + else: + choosed_candidate = max(best_candidate, peer_best_candidate, key=lambda c:c.priority) + + if choosed_candidate is None: + log.warning(u"Socks5 negociation failed, we need to fallback to IBB") + else: + if choosed_candidate==best_candidate: + who = u'our' + else: + who = u'other peer' + best_candidate.discard() + + log.info(u"Socks5 negociation successful, {who} candidate will be used: {candidate}".format( + who = who, + candidate = choosed_candidate)) + del transport_data['best_candidate'] + del transport_data['peer_best_candidate'] + if content_data['senders'] == session['role']: + # we can now start the file transfer + choosed_candidate.startTransfer(transport_data['session_hash']) + + @defer.inlineCallbacks + def jingleHandler(self, action, session, content_name, transport_elt, profile): + client = self.host.getClient(profile) + content_data = session['contents'][content_name] + transport_data = content_data['transport_data'] + + if action in (self._j.A_ACCEPTED_ACK,): + pass + + elif action == self._j.A_SESSION_ACCEPT: + # initiator side, we select a candidate in the ones sent by responder + assert 'peer_candidates' not in transport_data + transport_data['peer_candidates'] = self._parseCandidates(transport_elt) + + # elif action == self._j.A_START: + elif action == self._j.A_START: + session_hash = transport_data['session_hash'] + peer_candidates = transport_data['peer_candidates'] + file_obj = content_data['file_obj'] + stream_d = self._s5b.registerHash(session_hash, file_obj, profile) + args = [session, content_name, profile] + stream_d.addCallbacks(self._streamCb, self._streamEb, args, None, args) + d = self._s5b.getBestCandidate(peer_candidates, session_hash, profile) + d.addCallback(self._foundPeerCandidate, session, transport_data, content_name, client) + + elif action == self._j.A_SESSION_INITIATE: + # responder side, we select a candidate in the ones sent by initiator + # and we give our candidates + assert 'peer_candidates' not in transport_data + sid = transport_data['sid'] = transport_elt['sid'] + session_hash = transport_data['session_hash'] = self._s5b.getSessionHash(session['to_jid'], client.jid, sid) + peer_candidates = transport_data['peer_candidates'] = self._parseCandidates(transport_elt) + file_obj = content_data['file_obj'] + stream_d = self._s5b.registerHash(session_hash, file_obj, profile) + args = [session, content_name, profile] + stream_d.addCallbacks(self._streamCb, self._streamEb, args, None, args) + d = self._s5b.getBestCandidate(peer_candidates, session_hash, profile) + d.addCallback(self._foundPeerCandidate, session, transport_data, content_name, client) + candidates = yield self._s5b.getCandidates(profile) + # we remove duplicate candidates + candidates = [candidate for candidate in candidates if candidate not in peer_candidates] + + transport_data['candidates'] = candidates + # we can now build a new element with our candidates + transport_elt = self._buildCandidates(session, candidates, sid, session_hash, client) + + elif action == self._j.A_TRANSPORT_INFO: + # other peer gave us its choosed candidate + try: + candidate_elt = transport_elt.elements(NS_JINGLE_S5B, 'candidate-used').next() + except StopIteration: + try: + candidate_elt = transport_elt.elements(NS_JINGLE_S5B, 'candidate-error').next() + except StopIteration: + log.warning(u"Unexpected transport element: {}".format(transport_elt.toXml())) + raise exceptions.DataError + else: + # candidate-error, no candidate worked + transport_data['peer_best_candidate'] = None + else: + # candidate-used, one candidate was choosed + try: + cid = candidate_elt.attributes['cid'] + except KeyError: + log.warning(u"No cid found in ") + raise exceptions.DataError + try: + candidate = (c for c in transport_data['candidates'] if c.id == cid).next() + except StopIteration: + log.warning(u"Given cid doesn't correspond to any known candidate !") + raise exceptions.DataError # TODO: send an error to other peer, and use better exception + except KeyError: + # a transport-info can also be intentionaly sent too early by other peer + # but there is little probability + log.error(u'"candidates" key doesn\'t exists in transport_data, it should at this point') + raise exceptions.InternalError + # at this point we have the candidate choosed by other peer + transport_data['peer_best_candidate'] = candidate + log.info(u"Other peer best candidate: {}".format(candidate)) + + del transport_data['candidates'] + self._checkCandidates(session, content_data, transport_data, client) + + else: + log.warning(u"FIXME: unmanaged action {}".format(action)) + + defer.returnValue(transport_elt) + + def _streamCb(self, dummy, session, content_name, profile): + self._j.contentTerminate(session, content_name, profile=profile) + + def _streamEb(self, failure, session, content_name, profile): + log.warning(u"Error while streaming through s5b: {}".format(failure)) + self._j.contentTerminate(session, content_name, reason=self._j.REASON_FAILED_TRANSPORT, profile=profile) + + +class XEP_0260_handler(XMPPHandler): + implements(iwokkel.IDisco) + + def getDiscoInfo(self, requestor, target, nodeIdentifier=''): + return [disco.DiscoFeature(NS_JINGLE_S5B)] + + def getDiscoItems(self, requestor, target, nodeIdentifier=''): + return []