diff src/plugins/plugin_xep_0065.py @ 398:cb0285372818

File transfer: - proxy managed in XEP-0065 (Socks5 bytestream) - bug: fixed a bad id used during stream negociation
author Goffi <goffi@goffi.org>
date Wed, 05 Oct 2011 16:49:57 +0200
parents 8f3551ceee17
children c513328ade9d
line wrap: on
line diff
--- a/src/plugins/plugin_xep_0065.py	Wed Oct 05 16:48:25 2011 +0200
+++ b/src/plugins/plugin_xep_0065.py	Wed Oct 05 16:49:57 2011 +0200
@@ -296,8 +296,12 @@
                 self.loseConnection()
                 return
 
-            self.state = STATE_TARGET_READY
-            self.factory.activateCb(self.sid, self.factory.iq_id)
+            if self.factory.proxy:
+                self.state = STATE_READY
+                self.factory.activateCb(self.sid, self.factory.iq_id, self.startTransfer)
+            else:
+                self.state = STATE_TARGET_READY
+                self.factory.activateCb(self.sid, self.factory.iq_id)
 
         except struct.error, why:
             return None
@@ -405,12 +409,20 @@
 class Socks5ClientFactory(protocol.ClientFactory):
     protocol = SOCKSv5
 
-    def __init__(self, current_stream, sid, iq_id, activateCb, finishedCb):
+    def __init__(self, current_stream, sid, iq_id, activateCb, finishedCb, proxy=False):
+        """Init the Client Factory
+        @param current_stream: current streams data
+        @param sid: Session ID
+        @param iq_id: iq id used to initiate the stream
+        @param activateCb: method to call to activate the stream
+        @param finishedCb: method to call when the stream session is finished
+        @param proxy: True if we are connecting throught a proxy (and we are a requester)"""
         self.data = current_stream[sid]
         self.sid = sid
         self.iq_id = iq_id
         self.activateCb = activateCb
         self.finishedCb = finishedCb
+        self.proxy = proxy
 
     def startedConnecting(self, connector):
         debug (_("Socks 5 client connection started"))
@@ -432,6 +444,13 @@
         <param name="Port" value="28915" type="string" />
     </category>
     </general>
+    <individual>
+    <category name="File Transfer">
+        <param name="Proxy" value="" type="string" />
+        <param name="Proxy host" value="" type="string" />
+        <param name="Proxy port" value="" type="string" />
+    </category>
+    </individual>
     </params>
     """
 
@@ -503,19 +522,6 @@
         else:
             failure_cb(sid, file_obj, NS_BS, failure_reason)
 
-    def setData(self, data, id):
-        self.data = data
-        self.transfer_id = id
-        
-    def sendFile(self, id, filepath, size):
-        #lauching socks5 requester
-        debug(_("Launching socks5 requester"))
-        self.server_factory.protocol.mode = "requester"
-        self.server_factory.protocol.filepath = filepath
-        self.server_factory.protocol.filesize = size
-        self.server_factory.protocol.transfer_id = id
-
-
     def startStream(self, file_obj, to_jid, sid, length, successCb, failureCb, size = None, profile='@NONE@'):
         """Launch the stream workflow
         @param file_obj: file_obj to send
@@ -529,9 +535,14 @@
             error(_('stream length not managed yet'))
             return;
         profile_jid, xmlstream = self.host.getJidNStream(profile)
+        if not profile_jid or not xmlstream:
+            error(_("Unknown profile, this should not happen"))
+            return;
         data = self.current_stream[sid] = {}
+        data["profile"] = profile
         data["timer"] = reactor.callLater(TIMEOUT, self._timeOut, sid)
         data["file_obj"] = file_obj
+        data["from"] = profile_jid
         data["to"] = to_jid
         data["success_cb"] = successCb
         data["failure_cb"] = failureCb
@@ -547,10 +558,18 @@
         query_elt = iq_elt.addElement('query', NS_BS)
         query_elt['mode'] = 'tcp'
         query_elt['sid'] = sid
+        #first streamhost: direct connection
         streamhost = query_elt.addElement('streamhost')
-        streamhost['host'] = "127.0.0.1" #self.host.memory.getParamA("IP", "File Transfer")
+        streamhost['host'] = self.host.memory.getParamA("IP", "File Transfer")
         streamhost['port'] = self.host.memory.getParamA("Port", "File Transfer")
         streamhost['jid'] = profile_jid.full()
+
+        #second streamhost: mediated connection, using proxy
+        streamhost = query_elt.addElement('streamhost')
+        streamhost['host'] = self.host.memory.getParamA("Proxy host", "File Transfer", profile_key=profile)
+        streamhost['port'] = self.host.memory.getParamA("Proxy port", "File Transfer", profile_key=profile)
+        streamhost['jid'] = self.host.memory.getParamA("Proxy", "File Transfer", profile_key=profile)
+
         iq_elt.addCallback(self.iqResult, sid)
         iq_elt.send()
 
@@ -560,20 +579,60 @@
             warning(_("Transfer failed"))
             return
         
-        try: 
+        try:
             data = self.current_stream[sid]
-            callback = data["start_transfer_cb"]
             file_obj = data["file_obj"]
             timer = data["timer"]
+            profile = data["profile"]
         except KeyError:
             error(_("Internal error, can't do transfer"))
             return
         
         if timer.active():
             timer.cancel()
+        
+        profile_jid, xmlstream = self.host.getJidNStream(profile)
+        query_elt = iq_elt.firstChildElement()
+        streamhost_elts = filter(lambda elt: elt.name == 'streamhost-used', query_elt.elements())
+        if not streamhost_elts:
+            warning(_("No streamhost found in stream query"))
+            return
 
-        callback(file_obj)
+        streamhost_jid = streamhost_elts[0]['jid']
+        if streamhost_jid != profile_jid.full():
+            debug(_("A proxy server is used"))
+            proxy_host = self.host.memory.getParamA("Proxy host", "File Transfer", profile_key=profile)
+            proxy_port = self.host.memory.getParamA("Proxy port", "File Transfer", profile_key=profile)
+            proxy_jid = self.host.memory.getParamA("Proxy", "File Transfer", profile_key=profile)
+            if proxy_jid != streamhost_jid:
+                warning(_("Proxy jid is not the same as in parameters, this should not happen"))
+                return
+            factory = Socks5ClientFactory(self.current_stream, sid, None, self.activateProxyStream, self._killId, True)
+            reactor.connectTCP(proxy_host, int(proxy_port), factory)
+        else:
+            data["start_transfer_cb"](file_obj) #We now activate the stream
 
+    def activateProxyStream(self, sid, iq_id, start_transfer_cb):
+        debug(_("activating stream"))
+        data = self.current_stream[sid]
+        profile = data['profile']
+        profile_jid, xmlstream = self.host.getJidNStream(profile)
+
+        iq_elt = client.IQ(xmlstream,'set')
+        iq_elt["from"] = profile_jid.full()
+        iq_elt["to"] = self.host.memory.getParamA("Proxy", "File Transfer", profile_key=profile)
+        query_elt = iq_elt.addElement('query', NS_BS)
+        query_elt['sid'] = sid
+        query_elt.addElement('activate', content=data['to'].full())
+        iq_elt.addCallback(self.proxyResult, sid, start_transfer_cb, data['file_obj'])
+        iq_elt.send()
+
+    def proxyResult(self, sid, start_transfer_cb, file_obj, iq_elt):
+        if iq_elt['type'] == 'error':
+            warning(_("Can't activate the proxy stream"))
+            return
+        else:
+            start_transfer_cb(file_obj)
 
     def prepareToReceive(self, from_jid, sid, file_obj, size, success_cb, failure_cb):
         """Called when a bytestream is imminent
@@ -680,9 +739,47 @@
     def __init__(self, plugin_parent):
         self.plugin_parent = plugin_parent
         self.host = plugin_parent.host
+
+    def _proxyDataResult(self, iq_elt):
+        """Called with the informations about proxy according to XEP-0065 #4
+        Params should be filled with these infos"""
+        if iq_elt["type"] == "error":
+            warning(_("Can't determine proxy informations"))
+            return
+        query_elt = iq_elt.firstChildElement()
+        if query_elt.name != "query":
+            warning(_("Bad answer received from proxy"))
+            return
+        streamhost_elts = filter(lambda elt: elt.name == 'streamhost', query_elt.elements())
+        if not streamhost_elts:
+            warning(_("No streamhost found in stream query"))
+            return
+        if len(streamhost_elts) != 1:
+            warning(_("Multiple streamhost elements in proxy not managed, keeping only the first one"))
+        streamhost_elt = streamhost_elts[0]
+        proxy = self.host.memory.setParam("Proxy", streamhost_elt.getAttribute("jid",""), "File Transfer", self.parent.profile)
+        proxy = self.host.memory.setParam("Proxy host", streamhost_elt.getAttribute("host",""), "File Transfer", self.parent.profile)
+        proxy = self.host.memory.setParam("Proxy port", streamhost_elt.getAttribute("port",""), "File Transfer", self.parent.profile)
+
     
     def connectionInitialized(self):
+        def after_init(ignore):
+            proxy_ent = self.host.memory.getServerServiceEntity("proxy", "bytestreams", self.parent.profile)
+            if not proxy_ent:
+                debug(_("No proxy found on this server"))
+                return
+            iq_elt = client.IQ(self.parent.xmlstream,'get')
+            iq_elt["to"] = proxy_ent.full()
+            query_elt = iq_elt.addElement('query', NS_BS)
+            iq_elt.addCallback(self._proxyDataResult)
+            iq_elt.send()
+
+
         self.xmlstream.addObserver(BS_REQUEST, self.plugin_parent.streamQuery, profile = self.parent.profile)
+        proxy = self.host.memory.getParamA("Proxy", "File Transfer", profile_key = self.parent.profile)
+        if not proxy:
+            self.parent.client_initialized.addCallback(after_init)
+            
 
 
     def getDiscoInfo(self, requestor, target, nodeIdentifier=''):