changeset 398:cb0285372818

File transfer: - proxy managed in XEP-0065 (Socks5 bytestream) - bug: fixed a bad id used during stream negociation
author Goffi <goffi@goffi.org>
date Wed, 05 Oct 2011 16:49:57 +0200
parents ccfd69d090c3
children 3ed53803b3b3
files src/plugins/plugin_xep_0065.py src/plugins/plugin_xep_0095.py src/plugins/plugin_xep_0096.py src/tools/memory.py
diffstat 4 files changed, 138 insertions(+), 42 deletions(-) [+]
line wrap: on
line diff
--- a/src/plugins/plugin_xep_0065.py	Wed Oct 05 16:48:25 2011 +0200
+++ b/src/plugins/plugin_xep_0065.py	Wed Oct 05 16:49:57 2011 +0200
@@ -296,8 +296,12 @@
                 self.loseConnection()
                 return
 
-            self.state = STATE_TARGET_READY
-            self.factory.activateCb(self.sid, self.factory.iq_id)
+            if self.factory.proxy:
+                self.state = STATE_READY
+                self.factory.activateCb(self.sid, self.factory.iq_id, self.startTransfer)
+            else:
+                self.state = STATE_TARGET_READY
+                self.factory.activateCb(self.sid, self.factory.iq_id)
 
         except struct.error, why:
             return None
@@ -405,12 +409,20 @@
 class Socks5ClientFactory(protocol.ClientFactory):
     protocol = SOCKSv5
 
-    def __init__(self, current_stream, sid, iq_id, activateCb, finishedCb):
+    def __init__(self, current_stream, sid, iq_id, activateCb, finishedCb, proxy=False):
+        """Init the Client Factory
+        @param current_stream: current streams data
+        @param sid: Session ID
+        @param iq_id: iq id used to initiate the stream
+        @param activateCb: method to call to activate the stream
+        @param finishedCb: method to call when the stream session is finished
+        @param proxy: True if we are connecting throught a proxy (and we are a requester)"""
         self.data = current_stream[sid]
         self.sid = sid
         self.iq_id = iq_id
         self.activateCb = activateCb
         self.finishedCb = finishedCb
+        self.proxy = proxy
 
     def startedConnecting(self, connector):
         debug (_("Socks 5 client connection started"))
@@ -432,6 +444,13 @@
         <param name="Port" value="28915" type="string" />
     </category>
     </general>
+    <individual>
+    <category name="File Transfer">
+        <param name="Proxy" value="" type="string" />
+        <param name="Proxy host" value="" type="string" />
+        <param name="Proxy port" value="" type="string" />
+    </category>
+    </individual>
     </params>
     """
 
@@ -503,19 +522,6 @@
         else:
             failure_cb(sid, file_obj, NS_BS, failure_reason)
 
-    def setData(self, data, id):
-        self.data = data
-        self.transfer_id = id
-        
-    def sendFile(self, id, filepath, size):
-        #lauching socks5 requester
-        debug(_("Launching socks5 requester"))
-        self.server_factory.protocol.mode = "requester"
-        self.server_factory.protocol.filepath = filepath
-        self.server_factory.protocol.filesize = size
-        self.server_factory.protocol.transfer_id = id
-
-
     def startStream(self, file_obj, to_jid, sid, length, successCb, failureCb, size = None, profile='@NONE@'):
         """Launch the stream workflow
         @param file_obj: file_obj to send
@@ -529,9 +535,14 @@
             error(_('stream length not managed yet'))
             return;
         profile_jid, xmlstream = self.host.getJidNStream(profile)
+        if not profile_jid or not xmlstream:
+            error(_("Unknown profile, this should not happen"))
+            return;
         data = self.current_stream[sid] = {}
+        data["profile"] = profile
         data["timer"] = reactor.callLater(TIMEOUT, self._timeOut, sid)
         data["file_obj"] = file_obj
+        data["from"] = profile_jid
         data["to"] = to_jid
         data["success_cb"] = successCb
         data["failure_cb"] = failureCb
@@ -547,10 +558,18 @@
         query_elt = iq_elt.addElement('query', NS_BS)
         query_elt['mode'] = 'tcp'
         query_elt['sid'] = sid
+        #first streamhost: direct connection
         streamhost = query_elt.addElement('streamhost')
-        streamhost['host'] = "127.0.0.1" #self.host.memory.getParamA("IP", "File Transfer")
+        streamhost['host'] = self.host.memory.getParamA("IP", "File Transfer")
         streamhost['port'] = self.host.memory.getParamA("Port", "File Transfer")
         streamhost['jid'] = profile_jid.full()
+
+        #second streamhost: mediated connection, using proxy
+        streamhost = query_elt.addElement('streamhost')
+        streamhost['host'] = self.host.memory.getParamA("Proxy host", "File Transfer", profile_key=profile)
+        streamhost['port'] = self.host.memory.getParamA("Proxy port", "File Transfer", profile_key=profile)
+        streamhost['jid'] = self.host.memory.getParamA("Proxy", "File Transfer", profile_key=profile)
+
         iq_elt.addCallback(self.iqResult, sid)
         iq_elt.send()
 
@@ -560,20 +579,60 @@
             warning(_("Transfer failed"))
             return
         
-        try: 
+        try:
             data = self.current_stream[sid]
-            callback = data["start_transfer_cb"]
             file_obj = data["file_obj"]
             timer = data["timer"]
+            profile = data["profile"]
         except KeyError:
             error(_("Internal error, can't do transfer"))
             return
         
         if timer.active():
             timer.cancel()
+        
+        profile_jid, xmlstream = self.host.getJidNStream(profile)
+        query_elt = iq_elt.firstChildElement()
+        streamhost_elts = filter(lambda elt: elt.name == 'streamhost-used', query_elt.elements())
+        if not streamhost_elts:
+            warning(_("No streamhost found in stream query"))
+            return
 
-        callback(file_obj)
+        streamhost_jid = streamhost_elts[0]['jid']
+        if streamhost_jid != profile_jid.full():
+            debug(_("A proxy server is used"))
+            proxy_host = self.host.memory.getParamA("Proxy host", "File Transfer", profile_key=profile)
+            proxy_port = self.host.memory.getParamA("Proxy port", "File Transfer", profile_key=profile)
+            proxy_jid = self.host.memory.getParamA("Proxy", "File Transfer", profile_key=profile)
+            if proxy_jid != streamhost_jid:
+                warning(_("Proxy jid is not the same as in parameters, this should not happen"))
+                return
+            factory = Socks5ClientFactory(self.current_stream, sid, None, self.activateProxyStream, self._killId, True)
+            reactor.connectTCP(proxy_host, int(proxy_port), factory)
+        else:
+            data["start_transfer_cb"](file_obj) #We now activate the stream
 
+    def activateProxyStream(self, sid, iq_id, start_transfer_cb):
+        debug(_("activating stream"))
+        data = self.current_stream[sid]
+        profile = data['profile']
+        profile_jid, xmlstream = self.host.getJidNStream(profile)
+
+        iq_elt = client.IQ(xmlstream,'set')
+        iq_elt["from"] = profile_jid.full()
+        iq_elt["to"] = self.host.memory.getParamA("Proxy", "File Transfer", profile_key=profile)
+        query_elt = iq_elt.addElement('query', NS_BS)
+        query_elt['sid'] = sid
+        query_elt.addElement('activate', content=data['to'].full())
+        iq_elt.addCallback(self.proxyResult, sid, start_transfer_cb, data['file_obj'])
+        iq_elt.send()
+
+    def proxyResult(self, sid, start_transfer_cb, file_obj, iq_elt):
+        if iq_elt['type'] == 'error':
+            warning(_("Can't activate the proxy stream"))
+            return
+        else:
+            start_transfer_cb(file_obj)
 
     def prepareToReceive(self, from_jid, sid, file_obj, size, success_cb, failure_cb):
         """Called when a bytestream is imminent
@@ -680,9 +739,47 @@
     def __init__(self, plugin_parent):
         self.plugin_parent = plugin_parent
         self.host = plugin_parent.host
+
+    def _proxyDataResult(self, iq_elt):
+        """Called with the informations about proxy according to XEP-0065 #4
+        Params should be filled with these infos"""
+        if iq_elt["type"] == "error":
+            warning(_("Can't determine proxy informations"))
+            return
+        query_elt = iq_elt.firstChildElement()
+        if query_elt.name != "query":
+            warning(_("Bad answer received from proxy"))
+            return
+        streamhost_elts = filter(lambda elt: elt.name == 'streamhost', query_elt.elements())
+        if not streamhost_elts:
+            warning(_("No streamhost found in stream query"))
+            return
+        if len(streamhost_elts) != 1:
+            warning(_("Multiple streamhost elements in proxy not managed, keeping only the first one"))
+        streamhost_elt = streamhost_elts[0]
+        proxy = self.host.memory.setParam("Proxy", streamhost_elt.getAttribute("jid",""), "File Transfer", self.parent.profile)
+        proxy = self.host.memory.setParam("Proxy host", streamhost_elt.getAttribute("host",""), "File Transfer", self.parent.profile)
+        proxy = self.host.memory.setParam("Proxy port", streamhost_elt.getAttribute("port",""), "File Transfer", self.parent.profile)
+
     
     def connectionInitialized(self):
+        def after_init(ignore):
+            proxy_ent = self.host.memory.getServerServiceEntity("proxy", "bytestreams", self.parent.profile)
+            if not proxy_ent:
+                debug(_("No proxy found on this server"))
+                return
+            iq_elt = client.IQ(self.parent.xmlstream,'get')
+            iq_elt["to"] = proxy_ent.full()
+            query_elt = iq_elt.addElement('query', NS_BS)
+            iq_elt.addCallback(self._proxyDataResult)
+            iq_elt.send()
+
+
         self.xmlstream.addObserver(BS_REQUEST, self.plugin_parent.streamQuery, profile = self.parent.profile)
+        proxy = self.host.memory.getParamA("Proxy", "File Transfer", profile_key = self.parent.profile)
+        if not proxy:
+            self.parent.client_initialized.addCallback(after_init)
+            
 
 
     def getDiscoInfo(self, requestor, target, nodeIdentifier=''):
--- a/src/plugins/plugin_xep_0095.py	Wed Oct 05 16:48:25 2011 +0200
+++ b/src/plugins/plugin_xep_0095.py	Wed Oct 05 16:49:57 2011 +0200
@@ -75,13 +75,13 @@
         info (_("XEP-0095 Stream initiation"))
         iq_el.handled=True
         si_el = iq_el.firstChildElement()
-        si_id = iq_el.getAttribute('id')
+        si_id = si_el.getAttribute('id')
         si_mime_type = iq_el.getAttribute('mime-type', 'application/octet-stream')
         si_profile = si_el.getAttribute('profile')
         si_profile_key =  si_profile[len(SI_PROFILE_HEADER):] if si_profile.startswith(SI_PROFILE_HEADER) else si_profile
         if self.si_profiles.has_key(si_profile_key):
             #We know this SI profile, we call the callback
-            self.si_profiles[si_profile_key](iq_el['from'], si_id, si_mime_type, si_el, profile)
+            self.si_profiles[si_profile_key](iq_el['id'], iq_el['from'], si_id, si_mime_type, si_el, profile)
         else:
             #We don't know this profile, we send an error
             self.sendBadProfileError(iq_el['id'], iq_el['from'], profile)
--- a/src/plugins/plugin_xep_0096.py	Wed Oct 05 16:48:25 2011 +0200
+++ b/src/plugins/plugin_xep_0096.py	Wed Oct 05 16:49:57 2011 +0200
@@ -70,8 +70,9 @@
         except KeyError:
             warning(_("kill id called on a non existant approval id"))
     
-    def transferRequest(self, from_jid, si_id, si_mime_type, si_el, profile):
+    def transferRequest(self, iq_id, from_jid, si_id, si_mime_type, si_el, profile):
         """Called when a file transfer is requested
+        @param iq_id: id of the iq request
         @param from_jid: jid of the sender
         @param si_id: Stream Initiation session id
         @param si_mime_type: Mime type of the file (or default "application/octet-stream" if unknown)
@@ -102,7 +103,7 @@
                     can_range = True
         else:
             warning(_("No file element found"))
-            self.host.plugins["XEP-0095"].sendBadRequestError(si_id, from_jid, profile)
+            self.host.plugins["XEP-0095"].sendBadRequestError(iq_id, from_jid, profile)
             return
             
         if feature_elts:
@@ -112,19 +113,19 @@
                 stream_method = self.host.plugins["XEP-0020"].negociate(feature_el, 'stream-method',self.managed_stream_m)
             except KeyError:
                 warning(_("No stream method found"))
-                self.host.plugins["XEP-0095"].sendBadRequestError(si_id, from_jid, profile)
+                self.host.plugins["XEP-0095"].sendBadRequestError(iq_id, from_jid, profile)
                 return
             if not stream_method:
                 warning(_("Can't find a valid stream method"))
-                self.host.plugins["XEP-0095"].sendFailedError(si_id, from_jid, profile)
+                self.host.plugins["XEP-0095"].sendFailedError(iq_id, from_jid, profile)
                 return
         else:
             warning(_("No feature element found"))
-            self.host.plugins["XEP-0095"].sendBadRequestError(si_id, from_jid, profile)
+            self.host.plugins["XEP-0095"].sendBadRequestError(iq_id, from_jid, profile)
             return
 
         #if we are here, the transfer can start, we just need user's agreement
-        data={ "filename":filename, "from":from_jid, "size":file_size, "date":file_date, "hash":file_hash, "desc":file_desc, "can_range": str(can_range) }
+        data={ "filename":filename, "id": iq_id, "from":from_jid, "size":file_size, "date":file_date, "hash":file_hash, "desc":file_desc, "can_range": str(can_range) }
         self._waiting_for_approval[si_id] = [data, reactor.callLater(300, self._kill_id, si_id), stream_method, [], profile]
 
         self.host.askConfirmation(si_id, "FILE_TRANSFER", data, self.confirmationCB)
@@ -137,12 +138,12 @@
         @return: File Object"""
         return open(dest_path, "ab" if can_range else "wb")
 
-    def confirmationCB(self, id, accepted, frontend_data):
+    def confirmationCB(self, sid, accepted, frontend_data):
         """Called on confirmation answer
-        @param id: file transfer session id
+        @param sid: file transfer session id
         @param accepted: True if file transfer is accepted
         @param frontend_data: data sent by frontend"""
-        data, timeout, stream_method, failed_methods, profile = self._waiting_for_approval[id]
+        data, timeout, stream_method, failed_methods, profile = self._waiting_for_approval[sid]
         can_range = data['can_range'] == "True"
         range_offset = 0
         if accepted:
@@ -152,20 +153,19 @@
                 dest_path = frontend_data['dest_path']
             except KeyError:
                 error(_('dest path not found in frontend_data'))
-                del(self._waiting_for_approval[id])
+                del(self._waiting_for_approval[sid])
                 return
             if stream_method == self.host.plugins["XEP-0065"].NAMESPACE:
                 file_obj = self._getFileObject(dest_path, can_range)
                 range_offset = file_obj.tell()
-                self.host.plugins["XEP-0065"].prepareToReceive(jid.JID(data['from']), id, file_obj, int(data["size"]), self._transferSucceeded, self._transferFailed)
-                #self.host.plugins["XEP-0065"].setData(data, id)
+                self.host.plugins["XEP-0065"].prepareToReceive(jid.JID(data['from']), sid, file_obj, int(data["size"]), self._transferSucceeded, self._transferFailed)
             elif stream_method == self.host.plugins["XEP-0047"].NAMESPACE:
                 file_obj = self._getFileObject(dest_path, can_range)
                 range_offset = file_obj.tell()
-                self.host.plugins["XEP-0047"].prepareToReceive(jid.JID(data['from']), id, file_obj, int(data["size"]), self._transferSucceeded, self._transferFailed)
+                self.host.plugins["XEP-0047"].prepareToReceive(jid.JID(data['from']), sid, file_obj, int(data["size"]), self._transferSucceeded, self._transferFailed)
             else:
                 error(_("Unknown stream method, this should not happen at this stage, cancelling transfer"))
-                del(self._waiting_for_approval[id])
+                del(self._waiting_for_approval[sid])
                 return
 
             #we can send the iq result
@@ -177,11 +177,11 @@
                 range_elt['offset'] = str(range_offset)
                 #TODO: manage range length
                 misc_elts.append(range_elt)
-            self.host.plugins["XEP-0095"].acceptStream(id, data['from'], feature_elt, misc_elts, profile)
+            self.host.plugins["XEP-0095"].acceptStream(data["id"], data['from'], feature_elt, misc_elts, profile)
         else:
-            debug (_("Transfer [%s] refused"), id)
-            self.host.plugins["XEP-0095"].sendRejectedError (id, data['from'], profile=profile)
-            del(self._waiting_for_approval[id])
+            debug (_("Transfer [%s] refused"), sid)
+            self.host.plugins["XEP-0095"].sendRejectedError (data["id"], data['from'], profile=profile)
+            del(self._waiting_for_approval[sid])
 
     def _transferSucceeded(self, sid, file_obj, stream_method):
         """Called by the stream method when transfer successfuly finished
@@ -289,4 +289,4 @@
 
     def sendFailureCb(self, sid, file_obj, stream_method, reason):
         file_obj.close()
-        warning(_('Transfer %(id)s failed with stream method %(s_method)s') % { 'id': sid, s_method: stream_method })
+        warning(_('Transfer %(id)s failed with stream method %(s_method)s') % { 'id': sid, "s_method": stream_method })
--- a/src/tools/memory.py	Wed Oct 05 16:48:25 2011 +0200
+++ b/src/tools/memory.py	Wed Oct 05 16:49:57 2011 +0200
@@ -667,7 +667,6 @@
         """Return list of contacts for given profile
         @param profile_key: profile key
         @return list of [contact, attr, groups]"""
-        debug ("Memory getContact OK (%s)", self.contacts)
         profile = self.getProfileName(profile_key)
         if not profile:
             error(_('Asking contacts for a non-existant profile'))