changeset 1560:dcce63810733

plugin XEP-0260: first draft
author Goffi <goffi@goffi.org>
date Mon, 02 Nov 2015 22:02:41 +0100
parents 7cc29634b6ef
children c09429bf587f
files src/plugins/plugin_xep_0260.py
diffstat 1 files changed, 315 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/plugins/plugin_xep_0260.py	Mon Nov 02 22:02:41 2015 +0100
@@ -0,0 +1,315 @@
+#!/usr/bin/python
+# -*- coding: utf-8 -*-
+
+# SAT plugin for Jingle (XEP-0260)
+# Copyright (C) 2009, 2010, 2011, 2012, 2013, 2014, 2015 Jérôme Poisson (goffi@goffi.org)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+from sat.core.i18n import _
+from sat.core.log import getLogger
+log = getLogger(__name__)
+from sat.core import exceptions
+from wokkel import disco, iwokkel
+from zope.interface import implements
+from twisted.words.xish import domish
+from twisted.words.protocols.jabber import jid
+from twisted.internet import defer
+import uuid
+
+try:
+    from twisted.words.protocols.xmlstream import XMPPHandler
+except ImportError:
+    from wokkel.subprotocols import XMPPHandler
+
+
+NS_JINGLE_S5B = 'urn:xmpp:jingle:transports:s5b:1'
+
+PLUGIN_INFO = {
+    "name": "Jingle SOCKS5 Bytestreams",
+    "import_name": "XEP-0260",
+    "type": "XEP",
+    "protocols": ["XEP-0260"],
+    "dependencies": ["XEP-0166", "XEP-0065"],
+    "main": "XEP_0260",
+    "handler": "yes",
+    "description": _("""Implementation of Jingle SOCKS5 Bytestreams""")
+}
+
+
+class XEP_0260(object):
+    # TODO: udp handling
+
+    def __init__(self, host):
+        log.info(_("plugin Jingle SOCKS5 Bytestreams"))
+        self.host = host
+        self._j = host.plugins["XEP-0166"] # shortcut to access jingle
+        self._s5b = host.plugins["XEP-0065"] # and socks5 bytestream
+        self._j.registerTransport(NS_JINGLE_S5B, self._j.TRANSPORT_STREAMING, self, 100)
+
+    def getHandler(self, profile):
+        return XEP_0260_handler()
+
+    def _parseCandidates(self, transport_elt):
+        """Parse <candidate> elements
+
+        @param transport_elt(domish.Element): parent <transport> element
+        @return (list[plugin_xep_0065.Candidate): list of parsed candidates
+        """
+        candidates = []
+        for candidate_elt in transport_elt.elements(NS_JINGLE_S5B, 'candidate'):
+            try:
+                cid = candidate_elt['cid']
+                host = candidate_elt['host']
+                jid_= jid.JID(candidate_elt['jid'])
+                port = int(candidate_elt.getAttribute('port', 1080))
+                priority = int(candidate_elt['priority'])
+                type_ = candidate_elt.getAttribute('type', self._s5b.TYPE_DIRECT)
+            except (KeyError, ValueError):
+                raise exceptions.DataError()
+            candidate = self._s5b.Candidate(host, port, type_, priority, jid_, cid)
+            candidates.append(candidate)
+            # self._s5b.registerCandidate(candidate)
+        return candidates
+
+    def _buildCandidates(self, session, candidates, sid, session_hash, client, mode=None):
+        """Build <transport> element with candidates
+
+        @param session(dict): jingle session data
+        @param candidates(iterator[plugin_xep_0065.Candidate]): iterator of candidates to add
+        @param sid(unicode): transport stream id
+        @param client: %(doc_client)s
+        @param mode(str, None): 'tcp' or 'udp', or None to have no attribute
+        @return (domish.Element): parent <transport> element where <candidate> elements must be added
+        """
+        proxy = next((candidate for candidate in candidates if candidate.type == self._s5b.TYPE_PROXY), None)
+        transport_elt = domish.Element((NS_JINGLE_S5B, "transport"))
+        transport_elt['sid'] = sid
+        if proxy is not None:
+            transport_elt['dstaddr'] = session_hash
+        if mode is not None:
+            transport_elt['mode'] = 'tcp' # XXX: we only manage tcp for now
+
+        for candidate in candidates:
+            log.debug(u"Adding candidate: {}".format(candidate))
+            candidate_elt = transport_elt.addElement('candidate', NS_JINGLE_S5B)
+            if candidate.id is None:
+                candidate.id = unicode(uuid.uuid4())
+            candidate_elt['cid'] = candidate.id
+            candidate_elt['host'] = candidate.host
+            candidate_elt['jid'] = candidate.jid.full()
+            candidate_elt['port'] = unicode(candidate.port)
+            candidate_elt['priority'] = unicode(candidate.priority)
+            candidate_elt['type'] = candidate.type
+        return transport_elt
+
+    @defer.inlineCallbacks
+    def jingleSessionInit(self, session, content_name, profile):
+        client = self.host.getClient(profile)
+        content_data = session['contents'][content_name]
+        transport_data = content_data['transport_data']
+        sid = transport_data['sid'] = unicode(uuid.uuid4())
+        session_hash = transport_data['session_hash'] = self._s5b.getSessionHash(client.jid, session['to_jid'], sid)
+        candidates = transport_data['candidates'] = yield self._s5b.getCandidates(profile)
+        mode = 'tcp' # XXX: we only manage tcp for now
+        transport_elt = self._buildCandidates(session, candidates, sid, session_hash, client, mode)
+
+        defer.returnValue(transport_elt)
+
+    def _foundPeerCandidate(self, candidate, session, transport_data, content_name, client):
+        """Called when the best candidate from other peer is found
+
+        @param candidate(XEP_0065.Candidate, None): selected candidate,
+            or None if no candidate is accessible
+        @param session(dict):  session data
+        @param transport_data(dict): transport data
+        @param content_name(dict): name of the current content
+        @param client(unicode): %(doc_client)s
+        """
+
+        transport_data['best_candidate'] = candidate
+        # we need to disconnect all non selected candidates before removing them
+        for c in transport_data['peer_candidates']:
+            if c is None or c is candidate:
+                continue
+            c.discard()
+        del transport_data['peer_candidates']
+        iq_elt, transport_elt = self._j.buildAction(self._j.A_TRANSPORT_INFO, session, content_name, client.profile)
+        if candidate is None:
+            log.warning(u"Can't connect to any peer candidate")
+            candidate_elt = transport_elt.addElement('candidate-error')
+        else:
+            log.info(u"Found best peer candidate: {}".format(unicode(candidate)))
+            candidate_elt = transport_elt.addElement('candidate-used')
+            candidate_elt['cid'] = candidate.id
+        iq_elt.send() # TODO: check result stanza
+        content_data = session['contents'][content_name]
+        self._checkCandidates(session, content_data, transport_data, client)
+
+    def _checkCandidates(self, session, content_data, transport_data, client):
+        """Called when a candidate has been choosed
+
+        if we have both candidates, we select one, or fallback to an other transport
+        @param session(dict):  session data
+        @param content_data(dict): content data
+        @param transport_data(dict): transport data
+        @param client(unicode): %(doc_client)s
+        """
+        try:
+            best_candidate = transport_data['best_candidate']
+        except KeyError:
+            # we have not our best candidate yet
+            return
+        try:
+            peer_best_candidate = transport_data['peer_best_candidate']
+        except KeyError:
+            # we have not peer best candidate yet
+            return
+
+        # at this point we have both candidates, it's time to choose one
+        if best_candidate is None or peer_best_candidate is None:
+            choosed_candidate = best_candidate or peer_best_candidate
+        else:
+            if best_candidate.priority == peer_best_candidate.priority:
+                # same priority, we choose initiator one according to XEP-0260 §2.4 #4
+                log.debug(u"Candidates have same priority, we choose the initiator one")
+                if session['initiator'] == client.jid:
+                    choosed_candidate = best_candidate
+                else:
+                    choosed_candidate = peer_best_candidate
+            else:
+                choosed_candidate = max(best_candidate, peer_best_candidate, key=lambda c:c.priority)
+
+        if choosed_candidate is None:
+            log.warning(u"Socks5 negociation failed, we need to fallback to IBB")
+        else:
+            if choosed_candidate==best_candidate:
+                who = u'our'
+            else:
+                who = u'other peer'
+                best_candidate.discard()
+
+            log.info(u"Socks5 negociation successful, {who} candidate will be used: {candidate}".format(
+                who = who,
+                candidate = choosed_candidate))
+            del transport_data['best_candidate']
+            del transport_data['peer_best_candidate']
+            if content_data['senders'] == session['role']:
+                # we can now start the file transfer
+                choosed_candidate.startTransfer(transport_data['session_hash'])
+
+    @defer.inlineCallbacks
+    def jingleHandler(self, action, session, content_name, transport_elt, profile):
+        client = self.host.getClient(profile)
+        content_data = session['contents'][content_name]
+        transport_data = content_data['transport_data']
+
+        if action in (self._j.A_ACCEPTED_ACK,):
+            pass
+
+        elif action == self._j.A_SESSION_ACCEPT:
+            # initiator side, we select a candidate in the ones sent by responder
+            assert 'peer_candidates' not in transport_data
+            transport_data['peer_candidates'] = self._parseCandidates(transport_elt)
+
+        # elif action == self._j.A_START:
+        elif action == self._j.A_START:
+            session_hash = transport_data['session_hash']
+            peer_candidates = transport_data['peer_candidates']
+            file_obj = content_data['file_obj']
+            stream_d = self._s5b.registerHash(session_hash, file_obj, profile)
+            args = [session, content_name, profile]
+            stream_d.addCallbacks(self._streamCb, self._streamEb, args, None, args)
+            d = self._s5b.getBestCandidate(peer_candidates, session_hash, profile)
+            d.addCallback(self._foundPeerCandidate, session, transport_data, content_name, client)
+
+        elif action == self._j.A_SESSION_INITIATE:
+            # responder side, we select a candidate in the ones sent by initiator
+            # and we give our candidates
+            assert 'peer_candidates' not in transport_data
+            sid = transport_data['sid'] = transport_elt['sid']
+            session_hash = transport_data['session_hash'] = self._s5b.getSessionHash(session['to_jid'], client.jid, sid)
+            peer_candidates = transport_data['peer_candidates'] = self._parseCandidates(transport_elt)
+            file_obj = content_data['file_obj']
+            stream_d = self._s5b.registerHash(session_hash, file_obj, profile)
+            args = [session, content_name, profile]
+            stream_d.addCallbacks(self._streamCb, self._streamEb, args, None, args)
+            d = self._s5b.getBestCandidate(peer_candidates, session_hash, profile)
+            d.addCallback(self._foundPeerCandidate, session, transport_data, content_name, client)
+            candidates = yield self._s5b.getCandidates(profile)
+            # we remove duplicate candidates
+            candidates = [candidate for candidate in candidates if candidate not in peer_candidates]
+
+            transport_data['candidates'] = candidates
+            # we can now build a new <transport> element with our candidates
+            transport_elt = self._buildCandidates(session, candidates, sid, session_hash, client)
+
+        elif action == self._j.A_TRANSPORT_INFO:
+            # other peer gave us its choosed candidate
+            try:
+                candidate_elt = transport_elt.elements(NS_JINGLE_S5B, 'candidate-used').next()
+            except StopIteration:
+                try:
+                    candidate_elt = transport_elt.elements(NS_JINGLE_S5B, 'candidate-error').next()
+                except StopIteration:
+                    log.warning(u"Unexpected transport element: {}".format(transport_elt.toXml()))
+                    raise exceptions.DataError
+                else:
+                    # candidate-error, no candidate worked
+                    transport_data['peer_best_candidate'] = None
+            else:
+                # candidate-used, one candidate was choosed
+                try:
+                    cid = candidate_elt.attributes['cid']
+                except KeyError:
+                    log.warning(u"No cid found in <candidate-used>")
+                    raise exceptions.DataError
+                try:
+                    candidate = (c for c in transport_data['candidates'] if c.id == cid).next()
+                except StopIteration:
+                    log.warning(u"Given cid doesn't correspond to any known candidate !")
+                    raise exceptions.DataError # TODO: send an error to other peer, and use better exception
+                except KeyError:
+                    # a transport-info can also be intentionaly sent too early by other peer
+                    # but there is little probability
+                    log.error(u'"candidates" key doesn\'t exists in transport_data, it should at this point')
+                    raise exceptions.InternalError
+                # at this point we have the candidate choosed by other peer
+                transport_data['peer_best_candidate'] = candidate
+                log.info(u"Other peer best candidate: {}".format(candidate))
+
+            del transport_data['candidates']
+            self._checkCandidates(session, content_data, transport_data, client)
+
+        else:
+            log.warning(u"FIXME: unmanaged action {}".format(action))
+
+        defer.returnValue(transport_elt)
+
+    def _streamCb(self, dummy, session, content_name, profile):
+        self._j.contentTerminate(session, content_name, profile=profile)
+
+    def _streamEb(self, failure, session, content_name, profile):
+        log.warning(u"Error while streaming through s5b: {}".format(failure))
+        self._j.contentTerminate(session, content_name, reason=self._j.REASON_FAILED_TRANSPORT, profile=profile)
+
+
+class XEP_0260_handler(XMPPHandler):
+    implements(iwokkel.IDisco)
+
+    def getDiscoInfo(self, requestor, target, nodeIdentifier=''):
+        return [disco.DiscoFeature(NS_JINGLE_S5B)]
+
+    def getDiscoItems(self, requestor, target, nodeIdentifier=''):
+        return []