view sat/plugins/plugin_xep_0260.py @ 2566:735467eb6cf7

core: replaced gtk3reactor by gireactor
author Goffi <goffi@goffi.org>
date Tue, 03 Apr 2018 20:54:17 +0200
parents 26edcf3a30eb
children 56f94936df1e
line wrap: on
line source

#!/usr/bin/env python2
# -*- coding: utf-8 -*-

# SAT plugin for Jingle (XEP-0260)
# Copyright (C) 2009-2018 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from sat.core.i18n import _
from sat.core.constants import Const as C
from sat.core.log import getLogger
log = getLogger(__name__)
from sat.core import exceptions
from wokkel import disco, iwokkel
from zope.interface import implements
from twisted.words.xish import domish
from twisted.words.protocols.jabber import jid
from twisted.internet import defer
import uuid

try:
    from twisted.words.protocols.xmlstream import XMPPHandler
except ImportError:
    from wokkel.subprotocols import XMPPHandler


NS_JINGLE_S5B = 'urn:xmpp:jingle:transports:s5b:1'

PLUGIN_INFO = {
    C.PI_NAME: "Jingle SOCKS5 Bytestreams",
    C.PI_IMPORT_NAME: "XEP-0260",
    C.PI_TYPE: "XEP",
    C.PI_MODES: C.PLUG_MODE_BOTH,
    C.PI_PROTOCOLS: ["XEP-0260"],
    C.PI_DEPENDENCIES: ["XEP-0166", "XEP-0065"],
    C.PI_RECOMMENDATIONS: ["XEP-0261"], # needed for fallback
    C.PI_MAIN: "XEP_0260",
    C.PI_HANDLER: "yes",
    C.PI_DESCRIPTION: _("""Implementation of Jingle SOCKS5 Bytestreams""")
}


class ProxyError(Exception):

    def __str__(self):
        return "an error happened while trying to use the proxy"


class XEP_0260(object):
    # TODO: udp handling

    def __init__(self, host):
        log.info(_("plugin Jingle SOCKS5 Bytestreams"))
        self.host = host
        self._j = host.plugins["XEP-0166"] # shortcut to access jingle
        self._s5b = host.plugins["XEP-0065"] # and socks5 bytestream
        try:
            self._jingle_ibb = host.plugins["XEP-0261"]
        except KeyError:
            self._jingle_ibb = None
        self._j.registerTransport(NS_JINGLE_S5B, self._j.TRANSPORT_STREAMING, self, 100)

    def getHandler(self, client):
        return XEP_0260_handler()

    def _parseCandidates(self, transport_elt):
        """Parse <candidate> elements

        @param transport_elt(domish.Element): parent <transport> element
        @return (list[plugin_xep_0065.Candidate): list of parsed candidates
        """
        candidates = []
        for candidate_elt in transport_elt.elements(NS_JINGLE_S5B, 'candidate'):
            try:
                cid = candidate_elt['cid']
                host = candidate_elt['host']
                jid_= jid.JID(candidate_elt['jid'])
                port = int(candidate_elt.getAttribute('port', 1080))
                priority = int(candidate_elt['priority'])
                type_ = candidate_elt.getAttribute('type', self._s5b.TYPE_DIRECT)
            except (KeyError, ValueError):
                raise exceptions.DataError()
            candidate = self._s5b.Candidate(host, port, type_, priority, jid_, cid)
            candidates.append(candidate)
            # self._s5b.registerCandidate(candidate)
        return candidates

    def _buildCandidates(self, session, candidates, sid, session_hash, client, mode=None):
        """Build <transport> element with candidates

        @param session(dict): jingle session data
        @param candidates(iterator[plugin_xep_0065.Candidate]): iterator of candidates to add
        @param sid(unicode): transport stream id
        @param client: %(doc_client)s
        @param mode(str, None): 'tcp' or 'udp', or None to have no attribute
        @return (domish.Element): parent <transport> element where <candidate> elements must be added
        """
        proxy = next((candidate for candidate in candidates if candidate.type == self._s5b.TYPE_PROXY), None)
        transport_elt = domish.Element((NS_JINGLE_S5B, "transport"))
        transport_elt['sid'] = sid
        if proxy is not None:
            transport_elt['dstaddr'] = session_hash
        if mode is not None:
            transport_elt['mode'] = 'tcp' # XXX: we only manage tcp for now

        for candidate in candidates:
            log.debug(u"Adding candidate: {}".format(candidate))
            candidate_elt = transport_elt.addElement('candidate', NS_JINGLE_S5B)
            if candidate.id is None:
                candidate.id = unicode(uuid.uuid4())
            candidate_elt['cid'] = candidate.id
            candidate_elt['host'] = candidate.host
            candidate_elt['jid'] = candidate.jid.full()
            candidate_elt['port'] = unicode(candidate.port)
            candidate_elt['priority'] = unicode(candidate.priority)
            candidate_elt['type'] = candidate.type
        return transport_elt

    @defer.inlineCallbacks
    def jingleSessionInit(self, client, session, content_name):
        content_data = session['contents'][content_name]
        transport_data = content_data['transport_data']
        sid = transport_data['sid'] = unicode(uuid.uuid4())
        session_hash = transport_data['session_hash'] = self._s5b.getSessionHash(client.jid, session['peer_jid'], sid)
        transport_data['peer_session_hash'] = self._s5b.getSessionHash(session['peer_jid'], client.jid, sid) # requester and target are inversed for peer candidates
        transport_data['stream_d'] = self._s5b.registerHash(client, session_hash, None)
        candidates = transport_data['candidates'] = yield self._s5b.getCandidates(client)
        mode = 'tcp' # XXX: we only manage tcp for now
        transport_elt = self._buildCandidates(session, candidates, sid, session_hash, client, mode)

        defer.returnValue(transport_elt)

    def _proxyActivatedCb(self, iq_result_elt, client, candidate, session, content_name):
        """Called when activation confirmation has been received from proxy

        cf XEP-0260 § 2.4
        """
        # now that the proxy is activated, we have to inform other peer
        iq_elt, transport_elt = self._j.buildAction(client, self._j.A_TRANSPORT_INFO, session, content_name)
        activated_elt = transport_elt.addElement('activated')
        activated_elt['cid'] = candidate.id
        iq_elt.send()

    def _proxyActivatedEb(self, stanza_error, client, candidate, session, content_name):
        """Called when activation error has been received from proxy

        cf XEP-0260 § 2.4
        """
        # TODO: fallback to IBB
        # now that the proxy is activated, we have to inform other peer
        iq_elt, transport_elt = self._j.buildAction(client, self._j.A_TRANSPORT_INFO, session, content_name)
        transport_elt.addElement('proxy-error')
        iq_elt.send()
        log.warning(u"Can't activate proxy, we need to fallback to IBB: {reason}"
            .format(reason = stanza_error.value.condition))
        self.doFallback(session, content_name, client)

    def _foundPeerCandidate(self, candidate, session, transport_data, content_name, client):
        """Called when the best candidate from other peer is found

        @param candidate(XEP_0065.Candidate, None): selected candidate,
            or None if no candidate is accessible
        @param session(dict):  session data
        @param transport_data(dict): transport data
        @param content_name(unicode): name of the current content
        @param client(unicode): %(doc_client)s
        """

        transport_data['best_candidate'] = candidate
        # we need to disconnect all non selected candidates before removing them
        for c in transport_data['peer_candidates']:
            if c is None or c is candidate:
                continue
            c.discard()
        del transport_data['peer_candidates']
        iq_elt, transport_elt = self._j.buildAction(client, self._j.A_TRANSPORT_INFO, session, content_name)
        if candidate is None:
            log.warning(u"Can't connect to any peer candidate")
            candidate_elt = transport_elt.addElement('candidate-error')
        else:
            log.info(u"Found best peer candidate: {}".format(unicode(candidate)))
            candidate_elt = transport_elt.addElement('candidate-used')
            candidate_elt['cid'] = candidate.id
        iq_elt.send() # TODO: check result stanza
        self._checkCandidates(session, content_name, transport_data, client)

    def _checkCandidates(self, session, content_name, transport_data, client):
        """Called when a candidate has been choosed

        if we have both candidates, we select one, or fallback to an other transport
        @param session(dict):  session data
        @param content_name(unicode): name of the current content
        @param transport_data(dict): transport data
        @param client(unicode): %(doc_client)s
        """
        content_data = session['contents'][content_name]
        try:
            best_candidate = transport_data['best_candidate']
        except KeyError:
            # we have not our best candidate yet
            return
        try:
            peer_best_candidate = transport_data['peer_best_candidate']
        except KeyError:
            # we have not peer best candidate yet
            return

        # at this point we have both candidates, it's time to choose one
        if best_candidate is None or peer_best_candidate is None:
            choosed_candidate = best_candidate or peer_best_candidate
        else:
            if best_candidate.priority == peer_best_candidate.priority:
                # same priority, we choose initiator one according to XEP-0260 §2.4 #4
                log.debug(u"Candidates have same priority, we select the one choosed by initiator")
                if session['initiator'] == client.jid:
                    choosed_candidate = best_candidate
                else:
                    choosed_candidate = peer_best_candidate
            else:
                choosed_candidate = max(best_candidate, peer_best_candidate, key=lambda c:c.priority)

        if choosed_candidate is None:
            log.warning(u"Socks5 negociation failed, we need to fallback to IBB")
            self.doFallback(session, content_name, client)
        else:
            if choosed_candidate == peer_best_candidate:
                # peer_best_candidate was choosed from the candidates we have sent
                # so our_candidate is true if choosed_candidate is peer_best_candidate
                our_candidate = True
                # than also mean that best_candidate must be discarded !
                try:
                    best_candidate.discard()
                except AttributeError: # but it can be None
                    pass
            else:
                our_candidate = False

            log.info(u"Socks5 negociation successful, {who} candidate will be used: {candidate}".format(
                who = u'our' if our_candidate else u'other peer',
                candidate = choosed_candidate))
            del transport_data['best_candidate']
            del transport_data['peer_best_candidate']

            if choosed_candidate.type == self._s5b.TYPE_PROXY:
                # the stream transfer need to wait for proxy activation
                # (see XEP-0260 § 2.4)
                if our_candidate:
                    d = self._s5b.connectCandidate(client, choosed_candidate, transport_data['session_hash'])
                    d.addCallback(lambda dummy: choosed_candidate.activate(transport_data['sid'], session['peer_jid'], client))
                    args = [client, choosed_candidate, session, content_name]
                    d.addCallbacks(self._proxyActivatedCb, self._proxyActivatedEb, args, None, args)
                else:
                    # this Deferred will be called when we'll receive activation confirmation from other peer
                    d = transport_data['activation_d'] = defer.Deferred()
            else:
                d = defer.succeed(None)

            if content_data['senders'] == session['role']:
                # we can now start the stream transfer (or start it after proxy activation)
                d.addCallback(lambda dummy: choosed_candidate.startTransfer(transport_data['session_hash']))
                d.addErrback(self._startEb, session, content_name, client)

    def _startEb(self, fail, session, content_name, client):
        """Called when it's not possible to start the transfer

        Will try to fallback to IBB
        """
        try:
            reason = unicode(fail.value)
        except AttributeError:
            reason = unicode(fail)
        log.warning(u"Cant start transfert, we'll try fallback method: {}".format(reason))
        self.doFallback(session, content_name, client)

    def _candidateInfo(self, candidate_elt, session, content_name, transport_data, client):
        """Called when best candidate has been received from peer (or if none is working)

        @param candidate_elt(domish.Element): candidate-used or candidate-error element
            (see XEP-0260 §2.3)
        @param session(dict):  session data
        @param content_name(unicode): name of the current content
        @param transport_data(dict): transport data
        @param client(unicode): %(doc_client)s
        """
        if candidate_elt.name == 'candidate-error':
            # candidate-error, no candidate worked
            transport_data['peer_best_candidate'] = None
        else:
            # candidate-used, one candidate was choosed
            try:
                cid = candidate_elt.attributes['cid']
            except KeyError:
                log.warning(u"No cid found in <candidate-used>")
                raise exceptions.DataError
            try:
                candidate = (c for c in transport_data['candidates'] if c.id == cid).next()
            except StopIteration:
                log.warning(u"Given cid doesn't correspond to any known candidate !")
                raise exceptions.DataError # TODO: send an error to other peer, and use better exception
            except KeyError:
                # a transport-info can also be intentionaly sent too early by other peer
                # but there is little probability
                log.error(u'"candidates" key doesn\'t exists in transport_data, it should at this point')
                raise exceptions.InternalError
            # at this point we have the candidate choosed by other peer
            transport_data['peer_best_candidate'] = candidate
            log.info(u"Other peer best candidate: {}".format(candidate))

        del transport_data['candidates']
        self._checkCandidates(session, content_name, transport_data, client)

    def _proxyActivationInfo(self, proxy_elt, session, content_name, transport_data, client):
        """Called when proxy has been activated (or has sent an error)

        @param proxy_elt(domish.Element): <activated/> or <proxy-error/> element
            (see XEP-0260 §2.4)
        @param session(dict):  session data
        @param content_name(unicode): name of the current content
        @param transport_data(dict): transport data
        @param client(unicode): %(doc_client)s
        """
        try:
            activation_d = transport_data.pop('activation_d')
        except KeyError:
            log.warning(u"Received unexpected transport-info for proxy activation")

        if proxy_elt.name == 'activated':
            activation_d.callback(None)
        else:
            activation_d.errback(ProxyError())

    @defer.inlineCallbacks
    def jingleHandler(self, client, action, session, content_name, transport_elt):
        content_data = session['contents'][content_name]
        transport_data = content_data['transport_data']

        if action in (self._j.A_ACCEPTED_ACK, self._j.A_PREPARE_RESPONDER):
            pass

        elif action == self._j.A_SESSION_ACCEPT:
            # initiator side, we select a candidate in the ones sent by responder
            assert 'peer_candidates' not in transport_data
            transport_data['peer_candidates'] = self._parseCandidates(transport_elt)

        elif action == self._j.A_START:
            session_hash = transport_data['session_hash']
            peer_candidates = transport_data['peer_candidates']
            stream_object = content_data['stream_object']
            self._s5b.associateStreamObject(client, session_hash, stream_object)
            stream_d = transport_data.pop('stream_d')
            stream_d.chainDeferred(content_data['finished_d'])
            peer_session_hash = transport_data['peer_session_hash']
            d = self._s5b.getBestCandidate(client, peer_candidates, session_hash, peer_session_hash)
            d.addCallback(self._foundPeerCandidate, session, transport_data, content_name, client)

        elif action == self._j.A_SESSION_INITIATE:
            # responder side, we select a candidate in the ones sent by initiator
            # and we give our candidates
            assert 'peer_candidates' not in transport_data
            sid = transport_data['sid'] = transport_elt['sid']
            session_hash = transport_data['session_hash'] = self._s5b.getSessionHash(client.jid, session['peer_jid'], sid)
            peer_session_hash = transport_data['peer_session_hash'] = self._s5b.getSessionHash(session['peer_jid'], client.jid, sid) # requester and target are inversed for peer candidates
            peer_candidates = transport_data['peer_candidates'] = self._parseCandidates(transport_elt)
            stream_object = content_data['stream_object']
            stream_d = self._s5b.registerHash(client, session_hash, stream_object)
            stream_d.chainDeferred(content_data['finished_d'])
            d = self._s5b.getBestCandidate(client, peer_candidates, session_hash, peer_session_hash)
            d.addCallback(self._foundPeerCandidate, session, transport_data, content_name, client)
            candidates = yield self._s5b.getCandidates(client)
            # we remove duplicate candidates
            candidates = [candidate for candidate in candidates if candidate not in peer_candidates]

            transport_data['candidates'] = candidates
            # we can now build a new <transport> element with our candidates
            transport_elt = self._buildCandidates(session, candidates, sid, session_hash, client)

        elif action == self._j.A_TRANSPORT_INFO:
            # transport-info can be about candidate or proxy activation
            candidate_elt = None

            for method, names in ((self._candidateInfo, ('candidate-used', 'candidate-error')),
                                  (self._proxyActivationInfo, ('activated', 'proxy-error'))):
                for name in names:
                    try:
                        candidate_elt = transport_elt.elements(NS_JINGLE_S5B, name).next()
                    except StopIteration:
                        continue
                    else:
                        method(candidate_elt, session, content_name, transport_data, client)
                        break

            if candidate_elt is None:
                log.warning(u"Unexpected transport element: {}".format(transport_elt.toXml()))
        elif action == self._j.A_DESTROY:
            # the transport is replaced (fallback ?), We need mainly to kill XEP-0065 session.
            # note that sid argument is not necessary for sessions created by this plugin
            self._s5b.killSession(None, transport_data['session_hash'], None, client)
        else:
            log.warning(u"FIXME: unmanaged action {}".format(action))

        defer.returnValue(transport_elt)

    def jingleTerminate(self, client, action, session, content_name, reason_elt):
        if reason_elt.decline:
            log.debug(u"Session declined, deleting S5B session")
            # we just need to clean the S5B session if it is declined
            content_data = session['contents'][content_name]
            transport_data = content_data['transport_data']
            self._s5b.killSession(None, transport_data['session_hash'], None, client)

    def _doFallback(self, feature_checked, session, content_name, client):
        """Do the fallback, method called once feature is checked

         @param feature_checked(bool): True if other peer can do IBB
         """
        if not feature_checked:
            log.warning(u"Other peer can't manage jingle IBB, be have to terminate the session")
            self._j.terminate(client, self._j.REASON_CONNECTIVITY_ERROR, session)
        else:
            self._j.transportReplace(client, self._jingle_ibb.NAMESPACE, session, content_name)

    def doFallback(self, session, content_name, client):
        """Fallback to IBB transport, used in last resort

        @param session(dict):  session data
        @param content_name(unicode): name of the current content
        @param client(unicode): %(doc_client)s
        """
        if session['role'] != self._j.ROLE_INITIATOR:
            # only initiator must do the fallback, see XEP-0260 §3
            return
        if self._jingle_ibb is None:
            log.warning(u"Jingle IBB (XEP-0261) plugin is not available, we have to close the session")
            self._j.terminate(client, self._j.REASON_CONNECTIVITY_ERROR, session)
        else:
            d = self.host.hasFeature(client, self._jingle_ibb.NAMESPACE, session['peer_jid'])
            d.addCallback(self._doFallback, session, content_name, client)
        return d


class XEP_0260_handler(XMPPHandler):
    implements(iwokkel.IDisco)

    def getDiscoInfo(self, requestor, target, nodeIdentifier=''):
        return [disco.DiscoFeature(NS_JINGLE_S5B)]

    def getDiscoItems(self, requestor, target, nodeIdentifier=''):
        return []