Mercurial > libervia-backend
view src/plugins/plugin_xep_0260.py @ 1596:b7ee113183fc
jp: better profile commands:
- new "profile/default" command
- info doesn't show password anymore by default, need to be explicitly requested
- info and modify don't need to connect anymore
- modify can now set default profile. As use_profile is set, at least a profile session need to be started when it would not be mandatory technicaly (if just setting the profile as default is needed). But this option should not be used often, and it's not a big side effect, so I don't feel the need to create a new dedicated command, or to do complicated checks to avoid the session start.
author | Goffi <goffi@goffi.org> |
---|---|
date | Sat, 14 Nov 2015 19:18:10 +0100 |
parents | c668081eba1c |
children | 25906c0dbc63 |
line wrap: on
line source
#!/usr/bin/python # -*- coding: utf-8 -*- # SAT plugin for Jingle (XEP-0260) # Copyright (C) 2009, 2010, 2011, 2012, 2013, 2014, 2015 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from sat.core.i18n import _ from sat.core.log import getLogger log = getLogger(__name__) from sat.core import exceptions from wokkel import disco, iwokkel from zope.interface import implements from twisted.words.xish import domish from twisted.words.protocols.jabber import jid from twisted.internet import defer import uuid try: from twisted.words.protocols.xmlstream import XMPPHandler except ImportError: from wokkel.subprotocols import XMPPHandler NS_JINGLE_S5B = 'urn:xmpp:jingle:transports:s5b:1' PLUGIN_INFO = { "name": "Jingle SOCKS5 Bytestreams", "import_name": "XEP-0260", "type": "XEP", "protocols": ["XEP-0260"], "dependencies": ["XEP-0166", "XEP-0065"], "main": "XEP_0260", "handler": "yes", "description": _("""Implementation of Jingle SOCKS5 Bytestreams""") } class ProxyError(Exception): pass class XEP_0260(object): # TODO: udp handling def __init__(self, host): log.info(_("plugin Jingle SOCKS5 Bytestreams")) self.host = host self._j = host.plugins["XEP-0166"] # shortcut to access jingle self._s5b = host.plugins["XEP-0065"] # and socks5 bytestream self._j.registerTransport(NS_JINGLE_S5B, self._j.TRANSPORT_STREAMING, self, 100) def getHandler(self, profile): return XEP_0260_handler() def _parseCandidates(self, transport_elt): """Parse <candidate> elements @param transport_elt(domish.Element): parent <transport> element @return (list[plugin_xep_0065.Candidate): list of parsed candidates """ candidates = [] for candidate_elt in transport_elt.elements(NS_JINGLE_S5B, 'candidate'): try: cid = candidate_elt['cid'] host = candidate_elt['host'] jid_= jid.JID(candidate_elt['jid']) port = int(candidate_elt.getAttribute('port', 1080)) priority = int(candidate_elt['priority']) type_ = candidate_elt.getAttribute('type', self._s5b.TYPE_DIRECT) except (KeyError, ValueError): raise exceptions.DataError() candidate = self._s5b.Candidate(host, port, type_, priority, jid_, cid) candidates.append(candidate) # self._s5b.registerCandidate(candidate) return candidates def _buildCandidates(self, session, candidates, sid, session_hash, client, mode=None): """Build <transport> element with candidates @param session(dict): jingle session data @param candidates(iterator[plugin_xep_0065.Candidate]): iterator of candidates to add @param sid(unicode): transport stream id @param client: %(doc_client)s @param mode(str, None): 'tcp' or 'udp', or None to have no attribute @return (domish.Element): parent <transport> element where <candidate> elements must be added """ proxy = next((candidate for candidate in candidates if candidate.type == self._s5b.TYPE_PROXY), None) transport_elt = domish.Element((NS_JINGLE_S5B, "transport")) transport_elt['sid'] = sid if proxy is not None: transport_elt['dstaddr'] = session_hash if mode is not None: transport_elt['mode'] = 'tcp' # XXX: we only manage tcp for now for candidate in candidates: log.debug(u"Adding candidate: {}".format(candidate)) candidate_elt = transport_elt.addElement('candidate', NS_JINGLE_S5B) if candidate.id is None: candidate.id = unicode(uuid.uuid4()) candidate_elt['cid'] = candidate.id candidate_elt['host'] = candidate.host candidate_elt['jid'] = candidate.jid.full() candidate_elt['port'] = unicode(candidate.port) candidate_elt['priority'] = unicode(candidate.priority) candidate_elt['type'] = candidate.type return transport_elt @defer.inlineCallbacks def jingleSessionInit(self, session, content_name, profile): client = self.host.getClient(profile) content_data = session['contents'][content_name] transport_data = content_data['transport_data'] sid = transport_data['sid'] = unicode(uuid.uuid4()) session_hash = transport_data['session_hash'] = self._s5b.getSessionHash(client.jid, session['peer_jid'], sid) candidates = transport_data['candidates'] = yield self._s5b.getCandidates(profile) mode = 'tcp' # XXX: we only manage tcp for now transport_elt = self._buildCandidates(session, candidates, sid, session_hash, client, mode) defer.returnValue(transport_elt) def _proxyActivatedCb(self, iq_result_elt, candidate, session, content_name, profile): """Called when activation confirmation has been received from proxy cf XEP-0260 § 2.4 """ # now that the proxy is activated, we have to inform other peer iq_elt, transport_elt = self._j.buildAction(self._j.A_TRANSPORT_INFO, session, content_name, profile) activated_elt = transport_elt.addElement('activated') activated_elt['cid'] = candidate.id iq_elt.send def _proxyActivatedEb(self, stanza_error, candidate, session, content_name, profile): """Called when activation error has been received from proxy cf XEP-0260 § 2.4 """ # TODO: fallback to IBB # now that the proxy is activated, we have to inform other peer iq_elt, transport_elt = self._j.buildAction(self._j.A_TRANSPORT_INFO, session, content_name, profile) transport_elt.addElement('proxy-error') iq_elt.send return stanza_error def _foundPeerCandidate(self, candidate, session, transport_data, content_name, client): """Called when the best candidate from other peer is found @param candidate(XEP_0065.Candidate, None): selected candidate, or None if no candidate is accessible @param session(dict): session data @param transport_data(dict): transport data @param content_name(unicode): name of the current content @param client(unicode): %(doc_client)s """ transport_data['best_candidate'] = candidate # we need to disconnect all non selected candidates before removing them for c in transport_data['peer_candidates']: if c is None or c is candidate: continue c.discard() del transport_data['peer_candidates'] iq_elt, transport_elt = self._j.buildAction(self._j.A_TRANSPORT_INFO, session, content_name, client.profile) if candidate is None: log.warning(u"Can't connect to any peer candidate") candidate_elt = transport_elt.addElement('candidate-error') else: log.info(u"Found best peer candidate: {}".format(unicode(candidate))) candidate_elt = transport_elt.addElement('candidate-used') candidate_elt['cid'] = candidate.id iq_elt.send() # TODO: check result stanza self._checkCandidates(session, content_name, transport_data, client) def _checkCandidates(self, session, content_name, transport_data, client): """Called when a candidate has been choosed if we have both candidates, we select one, or fallback to an other transport @param session(dict): session data @param content_name(unicode): name of the current content @param transport_data(dict): transport data @param client(unicode): %(doc_client)s """ content_data = session['contents'][content_name] try: best_candidate = transport_data['best_candidate'] except KeyError: # we have not our best candidate yet return try: peer_best_candidate = transport_data['peer_best_candidate'] except KeyError: # we have not peer best candidate yet return # at this point we have both candidates, it's time to choose one if best_candidate is None or peer_best_candidate is None: choosed_candidate = best_candidate or peer_best_candidate else: if best_candidate.priority == peer_best_candidate.priority: # same priority, we choose initiator one according to XEP-0260 §2.4 #4 log.debug(u"Candidates have same priority, we choose the initiator one") if session['initiator'] == client.jid: choosed_candidate = best_candidate else: choosed_candidate = peer_best_candidate else: choosed_candidate = max(best_candidate, peer_best_candidate, key=lambda c:c.priority) if choosed_candidate is None: log.warning(u"Socks5 negociation failed, we need to fallback to IBB") else: if choosed_candidate == peer_best_candidate: # peer_best_candidate was choosed from the candidates we have sent # so our_candidate is true if choosed_candidate is peer_best_candidate our_candidate = True # than also mean that best_candidate must be discarded ! try: best_candidate.discard() except AttributeError: # but it can be None pass else: our_candidate = False log.info(u"Socks5 negociation successful, {who} candidate will be used: {candidate}".format( who = u'our' if our_candidate else u'other peer', candidate = choosed_candidate)) del transport_data['best_candidate'] del transport_data['peer_best_candidate'] if choosed_candidate.type == self._s5b.TYPE_PROXY: # the file transfer need to wait for proxy activation # (see XEP-0260 § 2.4) if our_candidate: d = self._s5b.connectCandidate(choosed_candidate, transport_data['session_hash'], profile=client.profile) d.addCallback(lambda dummy: choosed_candidate.activate(transport_data['sid'], session['peer_jid'], client)) args = [choosed_candidate, session, content_name, client.profile] d.addCallbacks(self._proxyActivatedCb, self._proxyActivatedEb, args, None, args) else: # this Deferred will be called when we'll receive activation confirmation from other peer d = transport_data['activation_d'] = defer.Deferred() else: d = defer.succeed(None) if content_data['senders'] == session['role']: # we can now start the file transfer (or start it after proxy activation) d.addCallback(lambda dummy: choosed_candidate.startTransfer(transport_data['session_hash'])) def _candidateInfo(self, candidate_elt, session, content_name, transport_data, client): """Called when best candidate has been received from peer (or if none is working) @param candidate_elt(domish.Element): candidate-used or candidate-error element (see XEP-0260 §2.3) @param session(dict): session data @param content_name(unicode): name of the current content @param transport_data(dict): transport data @param client(unicode): %(doc_client)s """ if candidate_elt.name == 'candidate-error': # candidate-error, no candidate worked transport_data['peer_best_candidate'] = None else: # candidate-used, one candidate was choosed try: cid = candidate_elt.attributes['cid'] except KeyError: log.warning(u"No cid found in <candidate-used>") raise exceptions.DataError try: candidate = (c for c in transport_data['candidates'] if c.id == cid).next() except StopIteration: log.warning(u"Given cid doesn't correspond to any known candidate !") raise exceptions.DataError # TODO: send an error to other peer, and use better exception except KeyError: # a transport-info can also be intentionaly sent too early by other peer # but there is little probability log.error(u'"candidates" key doesn\'t exists in transport_data, it should at this point') raise exceptions.InternalError # at this point we have the candidate choosed by other peer transport_data['peer_best_candidate'] = candidate log.info(u"Other peer best candidate: {}".format(candidate)) del transport_data['candidates'] self._checkCandidates(session, content_name, transport_data, client) def _proxyActivationInfo(self, proxy_elt, session, content_name, transport_data, client): """Called when proxy has been activated (or has sent an error) @param proxy_elt(domish.Element): <activated/> or <proxy-error/> element (see XEP-0260 §2.4) @param session(dict): session data @param content_name(unicode): name of the current content @param transport_data(dict): transport data @param client(unicode): %(doc_client)s """ try: activation_d = transport_data.pop('activation_d') except KeyError: log.warning(u"Received unexpected transport-info for proxy activation") if proxy_elt.name == 'activated': activation_d.callback(None) else: activation_d.errback(ProxyError) @defer.inlineCallbacks def jingleHandler(self, action, session, content_name, transport_elt, profile): client = self.host.getClient(profile) content_data = session['contents'][content_name] transport_data = content_data['transport_data'] if action in (self._j.A_ACCEPTED_ACK,): pass elif action == self._j.A_SESSION_ACCEPT: # initiator side, we select a candidate in the ones sent by responder assert 'peer_candidates' not in transport_data transport_data['peer_candidates'] = self._parseCandidates(transport_elt) # elif action == self._j.A_START: elif action == self._j.A_START: session_hash = transport_data['session_hash'] peer_candidates = transport_data['peer_candidates'] file_obj = content_data['file_obj'] stream_d = self._s5b.registerHash(session_hash, file_obj, profile) stream_d.chainDeferred(content_data['finished_d']) d = self._s5b.getBestCandidate(peer_candidates, session_hash, profile) d.addCallback(self._foundPeerCandidate, session, transport_data, content_name, client) elif action == self._j.A_SESSION_INITIATE: # responder side, we select a candidate in the ones sent by initiator # and we give our candidates assert 'peer_candidates' not in transport_data sid = transport_data['sid'] = transport_elt['sid'] session_hash = transport_data['session_hash'] = self._s5b.getSessionHash(session['peer_jid'], client.jid, sid) peer_candidates = transport_data['peer_candidates'] = self._parseCandidates(transport_elt) file_obj = content_data['file_obj'] stream_d = self._s5b.registerHash(session_hash, file_obj, profile) stream_d.chainDeferred(content_data['finished_d']) d = self._s5b.getBestCandidate(peer_candidates, session_hash, profile) d.addCallback(self._foundPeerCandidate, session, transport_data, content_name, client) candidates = yield self._s5b.getCandidates(profile) # we remove duplicate candidates candidates = [candidate for candidate in candidates if candidate not in peer_candidates] transport_data['candidates'] = candidates # we can now build a new <transport> element with our candidates transport_elt = self._buildCandidates(session, candidates, sid, session_hash, client) elif action == self._j.A_TRANSPORT_INFO: # transport-info can be about candidate or proxy activation candidate_elt = None for method, names in ((self._candidateInfo, ('candidate-used', 'candidate-error')), (self._proxyActivationInfo, ('activated', 'proxy-error'))): for name in names: try: candidate_elt = transport_elt.elements(NS_JINGLE_S5B, name).next() except StopIteration: continue else: method(candidate_elt, session, content_name, transport_data, client) break if candidate_elt is None: log.warning(u"Unexpected transport element: {}".format(transport_elt.toXml())) else: log.warning(u"FIXME: unmanaged action {}".format(action)) defer.returnValue(transport_elt) class XEP_0260_handler(XMPPHandler): implements(iwokkel.IDisco) def getDiscoInfo(self, requestor, target, nodeIdentifier=''): return [disco.DiscoFeature(NS_JINGLE_S5B)] def getDiscoItems(self, requestor, target, nodeIdentifier=''): return []