diff src/plugins/plugin_xep_0065.py @ 538:2c4016921403

core, frontends, bridgen plugins: fixed methods which were unproperly managing multi-profiles - added profile argument to askConfirmation, actionResult, actionResultExt, entityDataUpdated, confirmationAnswer, getProgress - core, frontends: fixed calls/signals according to new bridge API - user of proper profile namespace for progression indicators and dialogs - memory: getParam* now return bool when param type is bool - memory: added getStringParam* to return string instead of typed value - core, memory, storage, quick_frontend: getHistory now manage properly multi-profiles - plugins XEP-0047, XEP-0054, XEP-0065, XEP-0077, XEP-0096; multi-profiles proper handling
author Goffi <goffi@goffi.org>
date Sat, 10 Nov 2012 16:38:16 +0100
parents a31abb97310d
children ca13633d3b6b
line wrap: on
line diff
--- a/src/plugins/plugin_xep_0065.py	Sun Nov 04 23:53:26 2012 +0100
+++ b/src/plugins/plugin_xep_0065.py	Sat Nov 10 16:38:16 2012 +0100
@@ -58,13 +58,13 @@
 from logging import debug, info, warning, error
 from twisted.internet import protocol, reactor
 from twisted.internet import error as jab_error
-from twisted.words.protocols.jabber import client, jid
+from twisted.words.protocols.jabber import jid, client as jabber_client
 from twisted.protocols.basic import FileSender
 from twisted.words.xish import domish
 from twisted.web.client import getPage
+from sat.core.exceptions import ProfileNotInCacheError
 import struct
-import urllib
-import hashlib, pdb
+import hashlib
 
 from zope.interface import implements
 
@@ -298,10 +298,10 @@
 
             if self.factory.proxy:
                 self.state = STATE_READY
-                self.factory.activateCb(self.sid, self.factory.iq_id, self.startTransfer)
+                self.factory.activateCb(self.sid, self.factory.iq_id, self.startTransfer, self.profile)
             else:
                 self.state = STATE_TARGET_READY
-                self.factory.activateCb(self.sid, self.factory.iq_id)
+                self.factory.activateCb(self.sid, self.factory.iq_id, self.profile)
 
         except struct.error, why:
             return None
@@ -311,6 +311,7 @@
         
         if isinstance(self.factory, Socks5ClientFactory):
             self.sid = self.factory.sid
+            self.profile = self.factory.profile
             self.data = self.factory.data
             self.state = STATE_TARGET_INITIAL
             self._startNegotiation()
@@ -318,13 +319,16 @@
     def connectRequested(self, addr, port):
         debug("connectRequested")
         
-        # Check that this session if expected
+        # Check that this session is expected
         if not self.factory.hash_sid_map.has_key(addr):
             #no: we refuse it
-            self.sendErrorReply(socks5.REPLY_CONN_REFUSED)
+            self.sendErrorReply(REPLY_CONN_REFUSED)
             return
-        self.sid = self.factory.hash_sid_map[addr]
-        self.factory.current_stream[self.sid]["start_transfer_cb"] = self.startTransfer
+        self.sid, self.profile = self.factory.hash_sid_map[addr]
+        client = self.factory.host.getClient(self.profile)
+        if not client:
+            raise ProfileNotInCacheError
+        client.xep_0065_current_stream[self.sid]["start_transfer_cb"] = self.startTransfer
         self.connectCompleted(addr, 0)
         self.transport.stopReading()
 
@@ -336,7 +340,7 @@
     def fileTransfered(self, d):
         info(_("File transfer completed, closing connection"))
         self.transport.loseConnection()
-        self.factory.finishedCb(self.sid, True)
+        self.factory.finishedCb(self.sid, True, self.profile)
 
     def connectCompleted(self, remotehost, remoteport):
         debug("connectCompleted")
@@ -395,8 +399,8 @@
 class Socks5ServerFactory(protocol.ServerFactory):
     protocol = SOCKSv5
 
-    def __init__(self, current_stream, hash_sid_map, finishedCb):
-        self.current_stream = current_stream
+    def __init__(self, host, hash_sid_map, finishedCb):
+        self.host = host
         self.hash_sid_map = hash_sid_map
         self.finishedCb = finishedCb
 
@@ -409,27 +413,30 @@
 class Socks5ClientFactory(protocol.ClientFactory):
     protocol = SOCKSv5
 
-    def __init__(self, current_stream, sid, iq_id, activateCb, finishedCb, proxy=False):
+    def __init__(self, current_stream, sid, iq_id, activateCb, finishedCb, proxy=False, profile=None):
         """Init the Client Factory
         @param current_stream: current streams data
         @param sid: Session ID
         @param iq_id: iq id used to initiate the stream
         @param activateCb: method to call to activate the stream
         @param finishedCb: method to call when the stream session is finished
-        @param proxy: True if we are connecting throught a proxy (and we are a requester)"""
+        @param proxy: True if we are connecting throught a proxy (and we are a requester)
+        @param profile: %(doc_profile)s"""
+        assert(profile)
         self.data = current_stream[sid]
         self.sid = sid
         self.iq_id = iq_id
         self.activateCb = activateCb
         self.finishedCb = finishedCb
         self.proxy = proxy
+        self.profile = profile
 
     def startedConnecting(self, connector):
         debug (_("Socks 5 client connection started"))
 
     def clientConnectionLost(self, connector, reason):
-        debug (_("Socks 5 client connection lost"))
-        self.finishedCb(self.sid, reason.type == jab_error.ConnectionDone) #TODO: really check if the state is actually successful
+        debug (_("Socks 5 client connection lost (reason: %s)"), reason)
+        self.finishedCb(self.sid, reason.type == jab_error.ConnectionDone, self.profile) #TODO: really check if the state is actually successful
 
 
 class XEP_0065():
@@ -458,12 +465,11 @@
         info(_("Plugin XEP_0065 initialization"))
         
         #session data
-        self.current_stream = {} #key: stream_id, value: data(dict)
-        self.hash_sid_map = {}  #key: hash of the transfer session, value: session id
+        self.hash_sid_map = {}  #key: hash of the transfer session, value: (session id, profile)
         
         self.host = host
         debug(_("registering"))
-        self.server_factory = Socks5ServerFactory(self.current_stream, self.hash_sid_map, self._killId)
+        self.server_factory = Socks5ServerFactory(host, self.hash_sid_map, lambda sid, success, profile: self._killId(sid, success, profile=profile))
 
         #parameters
         host.memory.importParams(XEP_0065.params)
@@ -476,53 +482,69 @@
     def getHandler(self, profile):
         return XEP_0065_handler(self)  
    
+    def profileConnected(self, profile):
+        client = self.host.getClient(profile)
+        if not client:
+            raise ProfileNotInCacheError
+        client.xep_0065_current_stream = {} #key: stream_id, value: data(dict)
+
     def getExternalIP(self):
         """Return IP visible from outside, by asking to a website"""
         return getPage("http://www.goffi.org/sat_tools/get_ip.php")
 
-    def getProgress(self, sid, data):
+    def getProgress(self, sid, data, profile):
         """Fill data with position of current transfer"""
+        client = self.host.getClient(profile)
+        if not client:
+            raise ProfileNotInCacheError
         try:
-            file_obj = self.current_stream[sid]["file_obj"]
+            file_obj = client.xep_0065_current_stream[sid]["file_obj"]
             data["position"] = str(file_obj.tell())
-            data["size"] = str(self.current_stream[sid]["size"])
+            data["size"] = str(client.xep_0065_current_stream[sid]["size"])
         except:
             pass
     
-    def _timeOut(self, sid):
+    def _timeOut(self, sid, profile):
         """Delecte current_stream id, called after timeout
-        @param id: id of self.current_stream"""
-        info(_("Socks5 Bytestream: TimeOut reached for id %s") % sid);
-        self._killId(sid, False, "TIMEOUT")
+        @param id: id of client.xep_0065_current_stream"""
+        info(_("Socks5 Bytestream: TimeOut reached for id %s [%s]") % (sid, profile));
+        self._killId(sid, False, "TIMEOUT", profile)
     
-    def _killId(self, sid, success=False, failure_reason="UNKNOWN"):
+    def _killId(self, sid, success=False, failure_reason="UNKNOWN", profile=None):
         """Delete an current_stream id, clean up associated observers
-        @param sid: id of self.current_stream"""
-        if not self.current_stream.has_key(sid):
+        @param sid: id of client.xep_0065_current_stream"""
+        assert(profile)
+        client = self.host.getClient(profile)
+        if not client:
+            warning(_("Client no more in cache"))
+            return
+        if not client.xep_0065_current_stream.has_key(sid):
             warning(_("kill id called on a non existant id"))
             return
-        if self.current_stream[sid].has_key("observer_cb"):
-            xmlstream = self.current_stream[sid]["xmlstream"]
-            xmlstream.removeObserver(self.current_stream[sid]["event_data"], self.current_stream[sid]["observer_cb"])
-        if self.current_stream[sid]['timer'].active():
-            self.current_stream[sid]['timer'].cancel()
-        if self.current_stream[sid].has_key("size"):
-            self.host.removeProgressCB(sid)
+        if client.xep_0065_current_stream[sid].has_key("observer_cb"):
+            xmlstream = client.xep_0065_current_stream[sid]["xmlstream"]
+            xmlstream.removeObserver(client.xep_0065_current_stream[sid]["event_data"], client.xep_0065_current_stream[sid]["observer_cb"])
+        if client.xep_0065_current_stream[sid]['timer'].active():
+            client.xep_0065_current_stream[sid]['timer'].cancel()
+        if client.xep_0065_current_stream[sid].has_key("size"):
+            self.host.removeProgressCB(sid, profile)
        
-        file_obj = self.current_stream[sid]['file_obj']
-        success_cb = self.current_stream[sid]['success_cb']
-        failure_cb = self.current_stream[sid]['failure_cb']
+        file_obj = client.xep_0065_current_stream[sid]['file_obj']
+        success_cb = client.xep_0065_current_stream[sid]['success_cb']
+        failure_cb = client.xep_0065_current_stream[sid]['failure_cb']
         
-        del self.current_stream[sid]
-        if self.hash_sid_map.has_key(sid):
-            del self.hash_sid_map[sid]
+        session_hash = client.xep_0065_current_stream[sid].get('hash')
+        del client.xep_0065_current_stream[sid]
+        if session_hash in self.hash_sid_map:
+            #FIXME: check that self.hash_sid_map is correctly cleaned in all cases (timeout, normal flow, etc).
+            del self.hash_sid_map[session_hash]
 
         if success:
-            success_cb(sid, file_obj, NS_BS)
+            success_cb(sid, file_obj, NS_BS, profile)
         else:
-            failure_cb(sid, file_obj, NS_BS, failure_reason)
+            failure_cb(sid, file_obj, NS_BS, failure_reason, profile)
 
-    def startStream(self, file_obj, to_jid, sid, length, successCb, failureCb, size = None, profile='@NONE@'):
+    def startStream(self, file_obj, to_jid, sid, length, successCb, failureCb, size = None, profile=None):
         """Launch the stream workflow
         @param file_obj: file_obj to send
         @param to_jid: JID of the recipient
@@ -531,16 +553,21 @@
         @param successCb: method to call when stream successfuly finished
         @param failureCb: method to call when something goes wrong
         @param profile: %(doc_profile)s"""
+        assert(profile)
+        client = self.host.getClient(profile)
+        if not client:
+            error(_("Unknown profile, this should not happen"))
+            raise ProfileNotInCacheError
+        
         if length != None:
             error(_('stream length not managed yet'))
             return;
-        profile_jid, xmlstream = self.host.getJidNStream(profile)
-        if not profile_jid or not xmlstream:
-            error(_("Unknown profile, this should not happen"))
-            return;
-        data = self.current_stream[sid] = {}
-        data["profile"] = profile
-        data["timer"] = reactor.callLater(TIMEOUT, self._timeOut, sid)
+        
+        profile_jid = client.jid
+        xmlstream = client.xmlstream
+        
+        data = client.xep_0065_current_stream[sid] = {}
+        data["timer"] = reactor.callLater(TIMEOUT, self._timeOut, sid, profile)
         data["file_obj"] = file_obj
         data["from"] = profile_jid
         data["to"] = to_jid
@@ -548,11 +575,11 @@
         data["failure_cb"] = failureCb
         data["xmlstream"] = xmlstream
         data["hash"] = calculateHash(profile_jid, to_jid, sid)
-        self.hash_sid_map[data["hash"]] = sid
+        self.hash_sid_map[data["hash"]] = (sid, profile)
         if size:
             data["size"] = size
-            self.host.registerProgressCB(sid, self.getProgress)
-        iq_elt = client.IQ(xmlstream,'set')
+            self.host.registerProgressCB(sid, self.getProgress, profile)
+        iq_elt = jabber_client.IQ(xmlstream,'set')
         iq_elt["from"] = profile_jid.full()
         iq_elt["to"] = to_jid.full()
         query_elt = iq_elt.addElement('query', NS_BS)
@@ -570,20 +597,21 @@
         streamhost['port'] = self.host.memory.getParamA("Proxy port", "File Transfer", profile_key=profile)
         streamhost['jid'] = self.host.memory.getParamA("Proxy", "File Transfer", profile_key=profile)
 
-        iq_elt.addCallback(self.iqResult, sid)
+        iq_elt.addCallback(self.iqResult, sid, profile)
         iq_elt.send()
 
-    def iqResult(self, sid, iq_elt):
+    def iqResult(self, sid, profile, iq_elt):
         """Called when the result of open iq is received"""
         if iq_elt["type"] == "error":
             warning(_("Transfer failed"))
             return
-        
+        client = self.host.getClient(profile)
+        if not client:
+            raise ProfileNotInCacheError
         try:
-            data = self.current_stream[sid]
+            data = client.xep_0065_current_stream[sid]
             file_obj = data["file_obj"]
             timer = data["timer"]
-            profile = data["profile"]
         except KeyError:
             error(_("Internal error, can't do transfer"))
             return
@@ -607,15 +635,17 @@
             if proxy_jid != streamhost_jid:
                 warning(_("Proxy jid is not the same as in parameters, this should not happen"))
                 return
-            factory = Socks5ClientFactory(self.current_stream, sid, None, self.activateProxyStream, self._killId, True)
+            factory = Socks5ClientFactory(client.xep_0065_current_stream, sid, None, self.activateProxyStream, lambda sid, success, profile: self._killId(sid, success, profile=profile), True, profile)
             reactor.connectTCP(proxy_host, int(proxy_port), factory)
         else:
             data["start_transfer_cb"](file_obj) #We now activate the stream
 
-    def activateProxyStream(self, sid, iq_id, start_transfer_cb):
+    def activateProxyStream(self, sid, iq_id, start_transfer_cb, profile):
         debug(_("activating stream"))
-        data = self.current_stream[sid]
-        profile = data['profile']
+        client = self.host.getClient(profile)
+        if not client:
+            raise ProfileNotInCacheError
+        data = client.xep_0065_current_stream[sid]
         profile_jid, xmlstream = self.host.getJidNStream(profile)
 
         iq_elt = client.IQ(xmlstream,'set')
@@ -634,22 +664,26 @@
         else:
             start_transfer_cb(file_obj)
 
-    def prepareToReceive(self, from_jid, sid, file_obj, size, success_cb, failure_cb):
+    def prepareToReceive(self, from_jid, sid, file_obj, size, success_cb, failure_cb, profile):
         """Called when a bytestream is imminent
         @param from_jid: jid of the sender
         @param sid: Stream id
         @param file_obj: File object where data will be written
         @param size: full size of the data, or None if unknown
         @param success_cb: method to call when successfuly finished
-        @param failure_cb: method to call when something goes wrong"""
-        data = self.current_stream[sid] = {}
+        @param failure_cb: method to call when something goes wrong
+        @param profile: %(doc_profile)s"""
+        client = self.host.getClient(profile)
+        if not client:
+            raise ProfileNotInCacheError
+        data = client.xep_0065_current_stream[sid] = {}
         data["from"] = from_jid
         data["file_obj"] = file_obj
         data["seq"] = -1
         if size:
             data["size"] = size
-            self.host.registerProgressCB(sid, self.getProgress)
-        data["timer"] = reactor.callLater(TIMEOUT, self._timeOut, sid)
+            self.host.registerProgressCB(sid, self.getProgress, profile)
+        data["timer"] = reactor.callLater(TIMEOUT, self._timeOut, sid, profile)
         data["success_cb"] = success_cb
         data["failure_cb"] = failure_cb
     
@@ -657,20 +691,26 @@
     def streamQuery(self, iq_elt, profile):
         """Get file using byte stream"""
         debug(_("BS stream query"))
-        profile_jid, xmlstream = self.host.getJidNStream(profile)
+        client = self.host.getClient(profile)
+        
+        if not client:
+            raise ProfileNotInCacheError
+        
+        xmlstream = client.xmlstream
+
         iq_elt.handled = True
         query_elt = iq_elt.firstChildElement()
         sid = query_elt.getAttribute("sid")
         streamhost_elts = filter(lambda elt: elt.name == 'streamhost', query_elt.elements())
         
-        if not sid in self.current_stream:
+        if not sid in client.xep_0065_current_stream:
             warning(_("Ignoring unexpected BS transfer: %s" % sid))
             self.sendNotAcceptableError(iq_elt['id'], iq_elt['from'], xmlstream)
             return
 
-        self.current_stream[sid]['timer'].cancel()
-        self.current_stream[sid]["to"] = jid.JID(iq_elt["to"])
-        self.current_stream[sid]["xmlstream"] = xmlstream
+        client.xep_0065_current_stream[sid]['timer'].cancel()
+        client.xep_0065_current_stream[sid]["to"] = jid.JID(iq_elt["to"])
+        client.xep_0065_current_stream[sid]["xmlstream"] = xmlstream
 
         if not streamhost_elts:
             warning(_("No streamhost found in stream query %s" % sid))
@@ -686,16 +726,19 @@
             self.sendBadRequestError(iq_elt['id'], iq_elt['from'], xmlstream)
             return
 
-        self.current_stream[sid]["streamhost"] = (sh_host, sh_port, sh_jid)
+        client.xep_0065_current_stream[sid]["streamhost"] = (sh_host, sh_port, sh_jid)
 
         info (_("Stream proposed: host=[%(host)s] port=[%(port)s]") % {'host':sh_host, 'port':sh_port})
-        factory = Socks5ClientFactory(self.current_stream, sid, iq_elt["id"], self.activateStream, self._killId)
+        factory = Socks5ClientFactory(client.xep_0065_current_stream, sid, iq_elt["id"], self.activateStream, lambda sid, success, profile: self._killId(sid, success, profile=profile), profile=profile)
         reactor.connectTCP(sh_host, int(sh_port), factory)
                 
-    def activateStream(self, sid, iq_id):
+    def activateStream(self, sid, iq_id, profile):
+        client = self.host.getClient(profile)
+        if not client:
+            raise ProfileNotInCacheError
         debug(_("activating stream"))
         result = domish.Element((None, 'iq'))
-        data = self.current_stream[sid]
+        data = client.xep_0065_current_stream[sid]
         result['type'] = 'result'
         result['id'] = iq_id
         result['from'] = data["to"].full()
@@ -769,7 +812,7 @@
             if not proxy_ent:
                 debug(_("No proxy found on this server"))
                 return
-            iq_elt = client.IQ(self.parent.xmlstream,'get')
+            iq_elt = jabber_client.IQ(self.parent.xmlstream,'get')
             iq_elt["to"] = proxy_ent.full()
             query_elt = iq_elt.addElement('query', NS_BS)
             iq_elt.addCallback(self._proxyDataResult)