Mercurial > libervia-backend
view src/plugins/plugin_xep_0260.py @ 1569:44854fb5d3b2
plugin XEP-0065: fixed CHUNK_SIZE to 4096 to avoid wild disconnection by some proxies (Prosody's proxy65 disconnect if it receive bigger chunks)
author | Goffi <goffi@goffi.org> |
---|---|
date | Sun, 08 Nov 2015 14:44:33 +0100 (2015-11-08) |
parents | 268fda4236ca |
children | 37d4be4a9fed |
line wrap: on
line source
#!/usr/bin/python # -*- coding: utf-8 -*- # SAT plugin for Jingle (XEP-0260) # Copyright (C) 2009, 2010, 2011, 2012, 2013, 2014, 2015 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from sat.core.i18n import _ from sat.core.log import getLogger log = getLogger(__name__) from sat.core import exceptions from wokkel import disco, iwokkel from zope.interface import implements from twisted.words.xish import domish from twisted.words.protocols.jabber import jid from twisted.internet import defer import uuid try: from twisted.words.protocols.xmlstream import XMPPHandler except ImportError: from wokkel.subprotocols import XMPPHandler NS_JINGLE_S5B = 'urn:xmpp:jingle:transports:s5b:1' PLUGIN_INFO = { "name": "Jingle SOCKS5 Bytestreams", "import_name": "XEP-0260", "type": "XEP", "protocols": ["XEP-0260"], "dependencies": ["XEP-0166", "XEP-0065"], "main": "XEP_0260", "handler": "yes", "description": _("""Implementation of Jingle SOCKS5 Bytestreams""") } class XEP_0260(object): # TODO: udp handling def __init__(self, host): log.info(_("plugin Jingle SOCKS5 Bytestreams")) self.host = host self._j = host.plugins["XEP-0166"] # shortcut to access jingle self._s5b = host.plugins["XEP-0065"] # and socks5 bytestream self._j.registerTransport(NS_JINGLE_S5B, self._j.TRANSPORT_STREAMING, self, 100) def getHandler(self, profile): return XEP_0260_handler() def _parseCandidates(self, transport_elt): """Parse <candidate> elements @param transport_elt(domish.Element): parent <transport> element @return (list[plugin_xep_0065.Candidate): list of parsed candidates """ candidates = [] for candidate_elt in transport_elt.elements(NS_JINGLE_S5B, 'candidate'): try: cid = candidate_elt['cid'] host = candidate_elt['host'] jid_= jid.JID(candidate_elt['jid']) port = int(candidate_elt.getAttribute('port', 1080)) priority = int(candidate_elt['priority']) type_ = candidate_elt.getAttribute('type', self._s5b.TYPE_DIRECT) except (KeyError, ValueError): raise exceptions.DataError() candidate = self._s5b.Candidate(host, port, type_, priority, jid_, cid) candidates.append(candidate) # self._s5b.registerCandidate(candidate) return candidates def _buildCandidates(self, session, candidates, sid, session_hash, client, mode=None): """Build <transport> element with candidates @param session(dict): jingle session data @param candidates(iterator[plugin_xep_0065.Candidate]): iterator of candidates to add @param sid(unicode): transport stream id @param client: %(doc_client)s @param mode(str, None): 'tcp' or 'udp', or None to have no attribute @return (domish.Element): parent <transport> element where <candidate> elements must be added """ proxy = next((candidate for candidate in candidates if candidate.type == self._s5b.TYPE_PROXY), None) transport_elt = domish.Element((NS_JINGLE_S5B, "transport")) transport_elt['sid'] = sid if proxy is not None: transport_elt['dstaddr'] = session_hash if mode is not None: transport_elt['mode'] = 'tcp' # XXX: we only manage tcp for now for candidate in candidates: log.debug(u"Adding candidate: {}".format(candidate)) candidate_elt = transport_elt.addElement('candidate', NS_JINGLE_S5B) if candidate.id is None: candidate.id = unicode(uuid.uuid4()) candidate_elt['cid'] = candidate.id candidate_elt['host'] = candidate.host candidate_elt['jid'] = candidate.jid.full() candidate_elt['port'] = unicode(candidate.port) candidate_elt['priority'] = unicode(candidate.priority) candidate_elt['type'] = candidate.type return transport_elt @defer.inlineCallbacks def jingleSessionInit(self, session, content_name, profile): client = self.host.getClient(profile) content_data = session['contents'][content_name] transport_data = content_data['transport_data'] sid = transport_data['sid'] = unicode(uuid.uuid4()) session_hash = transport_data['session_hash'] = self._s5b.getSessionHash(client.jid, session['peer_jid'], sid) candidates = transport_data['candidates'] = yield self._s5b.getCandidates(profile) mode = 'tcp' # XXX: we only manage tcp for now transport_elt = self._buildCandidates(session, candidates, sid, session_hash, client, mode) defer.returnValue(transport_elt) def _foundPeerCandidate(self, candidate, session, transport_data, content_name, client): """Called when the best candidate from other peer is found @param candidate(XEP_0065.Candidate, None): selected candidate, or None if no candidate is accessible @param session(dict): session data @param transport_data(dict): transport data @param content_name(dict): name of the current content @param client(unicode): %(doc_client)s """ transport_data['best_candidate'] = candidate # we need to disconnect all non selected candidates before removing them for c in transport_data['peer_candidates']: if c is None or c is candidate: continue c.discard() del transport_data['peer_candidates'] iq_elt, transport_elt = self._j.buildAction(self._j.A_TRANSPORT_INFO, session, content_name, client.profile) if candidate is None: log.warning(u"Can't connect to any peer candidate") candidate_elt = transport_elt.addElement('candidate-error') else: log.info(u"Found best peer candidate: {}".format(unicode(candidate))) candidate_elt = transport_elt.addElement('candidate-used') candidate_elt['cid'] = candidate.id iq_elt.send() # TODO: check result stanza content_data = session['contents'][content_name] self._checkCandidates(session, content_data, transport_data, client) def _checkCandidates(self, session, content_data, transport_data, client): """Called when a candidate has been choosed if we have both candidates, we select one, or fallback to an other transport @param session(dict): session data @param content_data(dict): content data @param transport_data(dict): transport data @param client(unicode): %(doc_client)s """ try: best_candidate = transport_data['best_candidate'] except KeyError: # we have not our best candidate yet return try: peer_best_candidate = transport_data['peer_best_candidate'] except KeyError: # we have not peer best candidate yet return # at this point we have both candidates, it's time to choose one if best_candidate is None or peer_best_candidate is None: choosed_candidate = best_candidate or peer_best_candidate else: if best_candidate.priority == peer_best_candidate.priority: # same priority, we choose initiator one according to XEP-0260 §2.4 #4 log.debug(u"Candidates have same priority, we choose the initiator one") if session['initiator'] == client.jid: choosed_candidate = best_candidate else: choosed_candidate = peer_best_candidate else: choosed_candidate = max(best_candidate, peer_best_candidate, key=lambda c:c.priority) if choosed_candidate is None: log.warning(u"Socks5 negociation failed, we need to fallback to IBB") else: if choosed_candidate==best_candidate: who = u'our' else: who = u'other peer' best_candidate.discard() log.info(u"Socks5 negociation successful, {who} candidate will be used: {candidate}".format( who = who, candidate = choosed_candidate)) del transport_data['best_candidate'] del transport_data['peer_best_candidate'] if content_data['senders'] == session['role']: # we can now start the file transfer choosed_candidate.startTransfer(transport_data['session_hash']) @defer.inlineCallbacks def jingleHandler(self, action, session, content_name, transport_elt, profile): client = self.host.getClient(profile) content_data = session['contents'][content_name] transport_data = content_data['transport_data'] if action in (self._j.A_ACCEPTED_ACK,): pass elif action == self._j.A_SESSION_ACCEPT: # initiator side, we select a candidate in the ones sent by responder assert 'peer_candidates' not in transport_data transport_data['peer_candidates'] = self._parseCandidates(transport_elt) # elif action == self._j.A_START: elif action == self._j.A_START: session_hash = transport_data['session_hash'] peer_candidates = transport_data['peer_candidates'] file_obj = content_data['file_obj'] stream_d = self._s5b.registerHash(session_hash, file_obj, profile) args = [session, content_name, profile] stream_d.addCallbacks(self._streamCb, self._streamEb, args, None, args) d = self._s5b.getBestCandidate(peer_candidates, session_hash, profile) d.addCallback(self._foundPeerCandidate, session, transport_data, content_name, client) elif action == self._j.A_SESSION_INITIATE: # responder side, we select a candidate in the ones sent by initiator # and we give our candidates assert 'peer_candidates' not in transport_data sid = transport_data['sid'] = transport_elt['sid'] session_hash = transport_data['session_hash'] = self._s5b.getSessionHash(session['peer_jid'], client.jid, sid) peer_candidates = transport_data['peer_candidates'] = self._parseCandidates(transport_elt) file_obj = content_data['file_obj'] stream_d = self._s5b.registerHash(session_hash, file_obj, profile) args = [session, content_name, profile] stream_d.addCallbacks(self._streamCb, self._streamEb, args, None, args) d = self._s5b.getBestCandidate(peer_candidates, session_hash, profile) d.addCallback(self._foundPeerCandidate, session, transport_data, content_name, client) candidates = yield self._s5b.getCandidates(profile) # we remove duplicate candidates candidates = [candidate for candidate in candidates if candidate not in peer_candidates] transport_data['candidates'] = candidates # we can now build a new <transport> element with our candidates transport_elt = self._buildCandidates(session, candidates, sid, session_hash, client) elif action == self._j.A_TRANSPORT_INFO: # other peer gave us its choosed candidate try: candidate_elt = transport_elt.elements(NS_JINGLE_S5B, 'candidate-used').next() except StopIteration: try: candidate_elt = transport_elt.elements(NS_JINGLE_S5B, 'candidate-error').next() except StopIteration: log.warning(u"Unexpected transport element: {}".format(transport_elt.toXml())) raise exceptions.DataError else: # candidate-error, no candidate worked transport_data['peer_best_candidate'] = None else: # candidate-used, one candidate was choosed try: cid = candidate_elt.attributes['cid'] except KeyError: log.warning(u"No cid found in <candidate-used>") raise exceptions.DataError try: candidate = (c for c in transport_data['candidates'] if c.id == cid).next() except StopIteration: log.warning(u"Given cid doesn't correspond to any known candidate !") raise exceptions.DataError # TODO: send an error to other peer, and use better exception except KeyError: # a transport-info can also be intentionaly sent too early by other peer # but there is little probability log.error(u'"candidates" key doesn\'t exists in transport_data, it should at this point') raise exceptions.InternalError # at this point we have the candidate choosed by other peer transport_data['peer_best_candidate'] = candidate log.info(u"Other peer best candidate: {}".format(candidate)) del transport_data['candidates'] self._checkCandidates(session, content_data, transport_data, client) else: log.warning(u"FIXME: unmanaged action {}".format(action)) defer.returnValue(transport_elt) def _streamCb(self, dummy, session, content_name, profile): self._j.contentTerminate(session, content_name, profile=profile) def _streamEb(self, failure, session, content_name, profile): log.warning(u"Error while streaming through s5b: {}".format(failure)) self._j.contentTerminate(session, content_name, reason=self._j.REASON_FAILED_TRANSPORT, profile=profile) class XEP_0260_handler(XMPPHandler): implements(iwokkel.IDisco) def getDiscoInfo(self, requestor, target, nodeIdentifier=''): return [disco.DiscoFeature(NS_JINGLE_S5B)] def getDiscoItems(self, requestor, target, nodeIdentifier=''): return []