diff src/plugins/plugin_xep_0065.py @ 1577:d04d7402b8e9

plugins XEP-0020, XEP-0065, XEP-0095, XEP-0096: fixed file copy with Stream Initiation: /!\ range is not working yet /!\ pipe plugin is broken for now
author Goffi <goffi@goffi.org>
date Wed, 11 Nov 2015 18:19:49 +0100
parents d5f59ba166fe
children 8cc7d83141a4
line wrap: on
line diff
--- a/src/plugins/plugin_xep_0065.py	Wed Nov 11 18:19:49 2015 +0100
+++ b/src/plugins/plugin_xep_0065.py	Wed Nov 11 18:19:49 2015 +0100
@@ -63,13 +63,12 @@
 from twisted.internet import protocol
 from twisted.internet import reactor
 from twisted.internet import error as internet_error
-from twisted.words.protocols.jabber import jid, client as jabber_client
 from twisted.words.protocols.jabber import error as jabber_error
+from twisted.words.protocols.jabber import jid
+from twisted.words.protocols.jabber import xmlstream
 from twisted.protocols.basic import FileSender
-from twisted.words.xish import domish
 from twisted.internet import defer
 from twisted.python import failure
-from sat.core.exceptions import ProfileNotInCacheError
 from collections import namedtuple
 import struct
 import hashlib
@@ -432,7 +431,6 @@
             return None
 
     def _makeRequest(self):
-        # sha1 = getSessionHash(self.data["from"], self.data["to"], self.sid)
         hash_ = self._session_hash
         request = struct.pack('!5B%dsH' % len(hash_), SOCKS5_VER, CMD_CONNECT, 0, ADDR_DOMAINNAME, len(hash_), hash_, 0)
         self.transport.write(request)
@@ -464,13 +462,8 @@
                 self.loseConnection()
                 return
 
-            # if self.factory.proxy:
-            #     self.state = STATE_READY
-            #     self.factory.activateCb(self.sid, self.factory.iq_id, self.startTransfer, self.profile)
-            # else:
             self.state = STATE_READY
             self.connection.callback(None)
-            # self.factory.activateCb(self.sid, self.factory.iq_id, self.profile)
 
         except struct.error:
             # The buffer is probably not complete, we need to wait more
@@ -489,9 +482,6 @@
                 .format(host=self.transport.getPeer().host))
             return
         self._session_hash = addr
-        # self.sid, self.profile = self.factory.hash_profiles_map[addr]
-        # client = self.factory.host.getClient(self.profile)
-        # client.xep_0065_current_stream[self.sid]["start_transfer_cb"] = self.startTransfer
         self.connectCompleted(addr, 0)
 
     def startTransfer(self):
@@ -546,11 +536,6 @@
 
     def connectionLost(self, reason):
         log.debug(u"Socks5 connection lost: {}".format(reason.value))
-        # self.transport.unregisterProducer()
-        # if self.peersock is not None:
-        #     self.peersock.peersock = None
-        #     self.peersock.transport.unregisterProducer()
-        #     self.peersock = None
         if self.state != STATE_READY:
             self.connection.errback(reason)
         if self.server_mode :
@@ -620,7 +605,6 @@
 class Socks5ClientFactory(protocol.ClientFactory):
     protocol = SOCKSv5
 
-    # def __init__(self, stream_data, sid, iq_id, activateCb, finishedCb, proxy=False, profile=C.PROF_KEY_NONE):
     def __init__(self, parent, session_hash, profile):
         """Init the Client Factory
 
@@ -635,13 +619,6 @@
         self._protocol_instance = None
         self.connector = None
         self._discarded = False
-        # self.data = stream_data[sid]
-        # self.sid = sid
-        # self.iq_id = iq_id
-        # self.activateCb = activateCb
-        # self.finishedCb = finishedCb
-        # self.proxy = proxy
-        # self.profile = profile
 
     def discard(self):
         """Disconnect the client
@@ -671,7 +648,6 @@
                 self.getSession()[DEFER_KEY].callback(None)
             else:
                 self.getSession()[DEFER_KEY].errback(reason)
-        # self.finishedCb(self.sid, reason.type == internet_error.ConnectionDone, self.profile)  # TODO: really check if the state is actually successful
 
     def buildProtocol(self, addr):
         log.debug(("Socks 5 client connection started"))
@@ -719,7 +695,7 @@
 
     def profileConnected(self, profile):
         client = self.host.getClient(profile)
-        client.xep_0065_current_stream = {}  # key: stream_id, value: session_data(dict)
+        client.xep_0065_sid_session = {}  # key: stream_id, value: session_data(dict)
         client._s5b_sessions = {}
 
     def getSessionHash(self, from_jid, to_jid, sid):
@@ -776,7 +752,7 @@
             notFound(server)
         iq_elt = client.IQ('get')
         iq_elt['to'] = proxy.full()
-        iq_elt.addElement('query', NS_BS)
+        iq_elt.addElement((NS_BS, 'query'))
 
         try:
             result_elt = yield iq_elt.send()
@@ -914,6 +890,14 @@
         return defers_list
 
     def getBestCandidate(self, candidates, session_hash, profile=C.PROF_KEY_NONE):
+        """Get best candidate (according to priority) which can connect
+
+        @param candidates(iterable[Candidate]): candidates to test
+        @param session_hash(unicode): hash of the session
+            hash is the same as hostname computer in XEP-0065 ยง 5.3.2 #1
+        @param profile: %(doc_profile)s
+        @return (D(None, Candidate)): best candidate or None if none can connect
+        """
         defer_candidates = None
 
         def connectionCb(candidate, profile):
@@ -947,7 +931,7 @@
 
     def _timeOut(self, sid, client):
         """Delecte current_stream id, called after timeout
-        @param id: id of client.xep_0065_current_stream"""
+        @param id: id of client.xep_0065_sid_session"""
         log.info(_("Socks5 Bytestream: TimeOut reached for id {sid} [{profile}]").format(
             sid=sid, profile=client.profile))
         self._killSession(sid, client, u"TIMEOUT")
@@ -961,7 +945,7 @@
             else, will be used to call failure_cb
         """
         try:
-            session = client.xep_0065_current_stream[sid]
+            session = client.xep_0065_sid_session[sid]
         except KeyError:
             log.warning(_("kill id called on a non existant id"))
             return
@@ -976,7 +960,7 @@
         if session['timer'].active():
             session['timer'].cancel()
 
-        del client.xep_0065_current_stream[sid]
+        del client.xep_0065_sid_session[sid]
 
         # FIXME: to check
         try:
@@ -1004,105 +988,73 @@
         @param successCb: method to call when stream successfuly finished
         @param failureCb: method to call when something goes wrong
         @param profile: %(doc_profile)s
+        @return (D): Deferred fired when session is finished
         """
         client = self.host.getClient(profile)
-        session_data = self._createSession(file_obj, to_jid, sid, client.profile)
-
-        session_data["to"] = to_jid
-        session_data["xmlstream"] = client.xmlstream
-        hash_ = session_data["hash"] = getSessionHash(client.jid, to_jid, sid)
+        session_data = self._createSession(file_obj, to_jid, sid, True, client.profile)
 
-        self.hash_profiles_map[hash_] = (sid, profile)
-
-        iq_elt = jabber_client.IQ(client.xmlstream, 'set')
-        iq_elt["from"] = client.jid.full()
-        iq_elt["to"] = to_jid.full()
-        query_elt = iq_elt.addElement('query', NS_BS)
-        query_elt['mode'] = 'tcp'
-        query_elt['sid'] = sid
+        session_data[client] = client
 
-        #first streamhost: direct connection
-        streamhost = query_elt.addElement('streamhost')
-        streamhost['host'] = self.host.memory.getParamA("IP", "File Transfer")
-        streamhost['port'] = self.host.memory.getParamA("Port", "File Transfer")
-        streamhost['jid'] = client.jid.full()
+        def gotCandidates(candidates):
+            session_data['candidates'] = candidates
+            iq_elt = client.IQ()
+            iq_elt["from"] = client.jid.full()
+            iq_elt["to"] = to_jid.full()
+            query_elt = iq_elt.addElement((NS_BS, 'query'))
+            query_elt['mode'] = 'tcp'
+            query_elt['sid'] = sid
 
-        #second streamhost: mediated connection, using proxy
-        streamhost = query_elt.addElement('streamhost')
-        streamhost['host'] = self.host.memory.getParamA("Proxy host", "File Transfer", profile_key=profile)
-        streamhost['port'] = self.host.memory.getParamA("Proxy port", "File Transfer", profile_key=profile)
-        streamhost['jid'] = self.host.memory.getParamA("Proxy", "File Transfer", profile_key=profile)
+            for candidate in candidates:
+                streamhost = query_elt.addElement('streamhost')
+                streamhost['host'] = candidate.host
+                streamhost['port'] = str(candidate.port)
+                streamhost['jid'] = candidate.jid.full()
 
-        iq_elt.addCallback(self._IQOpen, session_data, client)
-        iq_elt.send()
+            d = iq_elt.send()
+            args = [session_data, client]
+            d.addCallbacks(self._IQNegotiationCb, self._IQNegotiationEb, args, None, args)
+
+        self.getCandidates(profile).addCallback(gotCandidates)
         return session_data[DEFER_KEY]
 
-    def _IQOpen(self, session_data, client, iq_elt):
+    def _IQNegotiationCb(self, iq_elt, session_data, client):
         """Called when the result of open iq is received
 
         @param session_data(dict): data of the session
         @param client: %(doc_client)s
         @param iq_elt(domish.Element): <iq> result
         """
-        sid = session_data['id']
-        if iq_elt["type"] == "error":
-            log.warning(_("Socks5 transfer failed"))
-            # FIXME: must clean session
-            return
-
         try:
-            session_data = client.xep_0065_current_stream[sid]
-            file_obj = session_data["file_obj"]
-            timer = session_data["timer"]
-        except KeyError:
-            raise exceptions.InternalError
-
-        timer.reset(TIMEOUT)
-
-        query_elt = iq_elt.elements(NS_BS, 'query').next()
-        streamhost_elts = list(query_elt.elements(NS_BS, 'streamhost-used'))
-
-        if not streamhost_elts:
-            log.warning(_("No streamhost found in stream query"))
+            query_elt = iq_elt.elements(NS_BS, 'query').next()
+            streamhost_used_elt = query_elt.elements(NS_BS, 'streamhost-used').next()
+        except StopIteration:
+            log.warning(u"No streamhost found in stream query")
             # FIXME: must clean session
             return
 
-        # FIXME: must be cleaned !
-
-        streamhost_jid = streamhost_elts[0]['jid']
-        if streamhost_jid != client.jid.full():
-            log.debug(_("A proxy server is used"))
-            proxy_host = self.host.memory.getParamA("Proxy host", "File Transfer", profile_key=client.profile)
-            proxy_port = self.host.memory.getParamA("Proxy port", "File Transfer", profile_key=client.profile)
-            proxy_jid = self.host.memory.getParamA("Proxy", "File Transfer", profile_key=client.profile)
-            if proxy_jid != streamhost_jid:
-                log.warning(_("Proxy jid is not the same as in parameters, this should not happen"))
-                return
-            factory = Socks5ClientFactory(client.xep_0065_current_stream, sid, None, self.activateProxyStream, lambda sid, success, profile: self._killSession(sid, client), True, client.profile)
-            reactor.connectTCP(proxy_host, int(proxy_port), factory)
-        else:
-            session_data["start_transfer_cb"](file_obj)  # We now activate the stream
+        streamhost_jid = jid.JID(streamhost_used_elt['jid'])
+        try:
+            candidate = (c for c in session_data['candidates'] if c.jid == streamhost_jid).next()
+        except StopIteration:
+            log.warning(u"Candidate [{jid}] is unknown !".format(jid=streamhost_jid.full()))
+            return
 
-    def activateProxyStream(self, sid, iq_id, start_transfer_cb, profile):
-        log.debug(_("activating stream"))
-        client = self.host.getClient(profile)
-        session_data = client.xep_0065_current_stream[sid]
+        if candidate.type == XEP_0065.TYPE_PROXY:
+            log.info(u"A Socks5 proxy is used")
+            d = self.connectCandidate(candidate, session_data['hash'], profile=client.profile)
+            d.addCallback(lambda dummy: candidate.activate(session_data['id'], session_data['peer_jid'], client))
+            d.addErrback(self._activationEb)
+        else:
+            d = defer.succeed(None)
 
-        iq_elt = client.IQ(client.xmlstream, 'set')
-        iq_elt["from"] = client.jid.full()
-        iq_elt["to"] = self.host.memory.getParamA("Proxy", "File Transfer", profile_key=profile)
-        query_elt = iq_elt.addElement('query', NS_BS)
-        query_elt['sid'] = sid
-        query_elt.addElement('activate', content=session_data['to'].full())
-        iq_elt.addCallback(self.proxyResult, sid, start_transfer_cb, session_data['file_obj'])
-        iq_elt.send()
+        d.addCallback(lambda dummy: candidate.startTransfer(session_data['hash']))
 
-    def proxyResult(self, sid, start_transfer_cb, file_obj, iq_elt):
-        if iq_elt['type'] == 'error':
-            log.warning(_("Can't activate the proxy stream"))
-            return
-        else:
-            start_transfer_cb(file_obj)
+    def _activationEb(self, failure):
+        log.warning(u"Proxy activation error: {}".format(failure.value))
+
+    def _IQNegotiationEb(self, stanza_err, session_data, client):
+        log.warning(u"Socks5 transfer failed: {}".format(stanza_err.condition))
+        # FIXME: must clean session
 
     def createSession(self, *args, **kwargs):
         """like [_createSession] but return the session deferred instead of the whole session
@@ -1111,26 +1063,34 @@
         """
         return self._createSession(*args, **kwargs)[DEFER_KEY]
 
-    def _createSession(self, file_obj, to_jid, sid, profile):
+    def _createSession(self, file_obj, to_jid, sid, requester=False, profile=C.PROF_KEY_NONE):
         """Called when a bytestream is imminent
 
         @param file_obj(file): File object where data will be written
         @param to_jid(jid.JId): jid of the other peer
         @param sid(unicode): session id
+        @param initiator(bool): if True, this session is create by initiator
         @param profile: %(doc_profile)s
         @return (dict): session data
         """
         client = self.host.getClient(profile)
-        if sid in client.xep_0065_current_stream:
+        if sid in client.xep_0065_sid_session:
             raise exceptions.ConflictError(u'A session with this id already exists !')
-        session_data = client.xep_0065_current_stream[sid] = \
+        if requester:
+            session_hash = getSessionHash(client.jid, to_jid, sid)
+            session_data = self._registerHash(session_hash, file_obj, profile)
+        else:
+            session_hash = getSessionHash(to_jid, client.jid, sid)
+            session_data = client._s5b_sessions[session_hash] = {
+                DEFER_KEY: defer.Deferred(),
+                }
+        client.xep_0065_sid_session[sid] = session_data
+        session_data.update(
             {'id': sid,
-             DEFER_KEY: defer.Deferred(),
-             'to': to_jid,
-             'file_obj': file_obj,
-             'seq': -1, # FIXME: to check
-             'timer': reactor.callLater(TIMEOUT, self._timeOut, sid, client),
-            }
+             'peer_jid': to_jid,
+             'file': file_obj,
+             'hash': session_hash,
+            })
 
         return session_data
 
@@ -1156,7 +1116,7 @@
         return client._s5b_sessions[session_hash]
 
     def registerHash(self, *args, **kwargs):
-        """like [_registerHash] but resutrn the session deferred instead of the whole session
+        """like [_registerHash] but resturn the session deferred instead of the whole session
         session deferred is fired when transfer is finished
         """
         return self._registerHash(*args, **kwargs)[DEFER_KEY]
@@ -1195,91 +1155,62 @@
         return session_data
 
     def streamQuery(self, iq_elt, profile):
-        """Get file using byte stream"""
-        log.debug(_("BS stream query"))
+        log.debug(u"BS stream query")
         client = self.host.getClient(profile)
 
-        if not client:
-            raise ProfileNotInCacheError
-
-        xmlstream = client.xmlstream
-
         iq_elt.handled = True
-        query_elt = iq_elt.firstChildElement()
-        sid = query_elt.getAttribute("sid")
-        streamhost_elts = filter(lambda elt: elt.name == 'streamhost', query_elt.elements())
 
-        if not sid in client.xep_0065_current_stream:
-            log.warning(_(u"Ignoring unexpected BS transfer: %s" % sid))
-            self.sendNotAcceptableError(iq_elt['id'], iq_elt['from'], xmlstream)
-            return
+        query_elt = iq_elt.elements(NS_BS, 'query').next()
+        try:
+            sid = query_elt['sid']
+        except KeyError:
+            log.warning(u"Invalid bystreams request received")
+            return client.sendError(iq_elt, "bad-request")
 
-        client.xep_0065_current_stream[sid]['timer'].cancel()
-        client.xep_0065_current_stream[sid]["to"] = jid.JID(iq_elt["to"])
-        client.xep_0065_current_stream[sid]["xmlstream"] = xmlstream
-
+        streamhost_elts = list(query_elt.elements(NS_BS, 'streamhost'))
         if not streamhost_elts:
-            log.warning(_(u"No streamhost found in stream query %s" % sid))
-            self.sendBadRequestError(iq_elt['id'], iq_elt['from'], xmlstream)
-            return
+            return client.sendError(iq_elt, "bad-request")
 
-        streamhost_elt = streamhost_elts[0]  # TODO: manage several streamhost elements case
-        sh_host = streamhost_elt.getAttribute("host")
-        sh_port = streamhost_elt.getAttribute("port")
-        sh_jid = streamhost_elt.getAttribute("jid")
-        if not sh_host or not sh_port or not sh_jid:
-            log.warning(_("incomplete streamhost element"))
-            self.sendBadRequestError(iq_elt['id'], iq_elt['from'], xmlstream)
-            return
+        try:
+            session_data = client.xep_0065_sid_session[sid]
+        except KeyError:
+            log.warning(u"Ignoring unexpected BS transfer: {}".format(sid))
+            return client.sendError(iq_elt, 'not-acceptable')
 
-        client.xep_0065_current_stream[sid]["streamhost"] = (sh_host, sh_port, sh_jid)
-
-        log.info(_("Stream proposed: host=[%(host)s] port=[%(port)s]") % {'host': sh_host, 'port': sh_port})
-        factory = Socks5ClientFactory(client.xep_0065_current_stream, sid, iq_elt["id"], self.activateStream, lambda sid, success, profile: self._killSession(sid, client), profile=profile)
-        reactor.connectTCP(sh_host, int(sh_port), factory)
+        peer_jid = session_data["peer_jid"] = jid.JID(iq_elt["from"])
 
-    def activateStream(self, sid, iq_id, profile):
-        client = self.host.getClient(profile)
-        log.debug(_("activating stream"))
-        result = domish.Element((None, 'iq'))
-        session_data = client.xep_0065_current_stream[sid]
-        result['type'] = 'result'
-        result['id'] = iq_id
-        result['from'] = session_data["to"].full()
-        result['to'] = session_data["from"].full()
-        query = result.addElement('query', NS_BS)
-        query['sid'] = sid
-        streamhost = query.addElement('streamhost-used')
-        streamhost['jid'] = session_data["streamhost"][2]
-        session_data["xmlstream"].send(result)
+        candidates = []
+        nb_sh = len(streamhost_elts)
+        for idx, sh_elt in enumerate(streamhost_elts):
+            try:
+                host, port, jid_ = sh_elt['host'], sh_elt['port'], jid.JID(sh_elt['jid'])
+            except KeyError:
+                log.warning(u"malformed streamhost element")
+                return client.sendError(iq_elt, "bad-request")
+            priority = nb_sh - idx
+            if jid_.userhostJID() != peer_jid.userhostJID():
+                type_ = XEP_0065.TYPE_PROXY
+            else:
+                type_ = XEP_0065.TYPE_DIRECT
+            candidates.append(Candidate(host, port, type_, priority, jid_))
 
-    def sendNotAcceptableError(self, iq_id, to_jid, xmlstream):
-        """Not acceptable error used when the stream is not expected or something is going wrong
-        @param iq_id: IQ id
-        @param to_jid: addressee
-        @param xmlstream: XML stream to use to send the error"""
-        result = domish.Element((None, 'iq'))
-        result['type'] = 'result'
-        result['id'] = iq_id
-        result['to'] = to_jid
-        error_el = result.addElement('error')
-        error_el['type'] = 'modify'
-        error_el.addElement(('urn:ietf:params:xml:ns:xmpp-stanzas', 'not-acceptable'))
-        xmlstream.send(result)
+        for candidate in candidates:
+            log.info(u"Candidate proposed: {}".format(candidate))
+
+        d = self.getBestCandidate(candidates, session_data['hash'], profile)
+        d.addCallback(self._ackStream, iq_elt, session_data, client)
 
-    def sendBadRequestError(self, iq_id, to_jid, xmlstream):
-        """Not acceptable error used when the stream is not expected or something is going wrong
-        @param iq_id: IQ id
-        @param to_jid: addressee
-        @param xmlstream: XML stream to use to send the error"""
-        result = domish.Element((None, 'iq'))
-        result['type'] = 'result'
-        result['id'] = iq_id
-        result['to'] = to_jid
-        error_el = result.addElement('error')
-        error_el['type'] = 'cancel'
-        error_el.addElement(('urn:ietf:params:xml:ns:xmpp-stanzas', 'bad-request'))
-        xmlstream.send(result)
+    def _ackStream(self, candidate, iq_elt, session_data, client):
+        if candidate is None:
+            log.info("No streamhost candidate worked, we have to end negotiation")
+            return client.sendError(iq_elt, 'item-not-found')
+        log.debug(u"activating stream")
+        result_elt = xmlstream.toResponse(iq_elt, 'result')
+        query_elt = result_elt.addElement((NS_BS, 'query'))
+        query_elt['sid'] = session_data['id']
+        streamhost_used_elt = query_elt.addElement('streamhost-used')
+        streamhost_used_elt['jid'] = candidate.jid.full()
+        client.xmlstream.send(result_elt)
 
 
 class XEP_0065_handler(XMPPHandler):