Mercurial > libervia-backend
diff src/plugins/plugin_xep_0065.py @ 398:cb0285372818
File transfer:
- proxy managed in XEP-0065 (Socks5 bytestream)
- bug: fixed a bad id used during stream negociation
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 05 Oct 2011 16:49:57 +0200 |
parents | 8f3551ceee17 |
children | c513328ade9d |
line wrap: on
line diff
--- a/src/plugins/plugin_xep_0065.py Wed Oct 05 16:48:25 2011 +0200 +++ b/src/plugins/plugin_xep_0065.py Wed Oct 05 16:49:57 2011 +0200 @@ -296,8 +296,12 @@ self.loseConnection() return - self.state = STATE_TARGET_READY - self.factory.activateCb(self.sid, self.factory.iq_id) + if self.factory.proxy: + self.state = STATE_READY + self.factory.activateCb(self.sid, self.factory.iq_id, self.startTransfer) + else: + self.state = STATE_TARGET_READY + self.factory.activateCb(self.sid, self.factory.iq_id) except struct.error, why: return None @@ -405,12 +409,20 @@ class Socks5ClientFactory(protocol.ClientFactory): protocol = SOCKSv5 - def __init__(self, current_stream, sid, iq_id, activateCb, finishedCb): + def __init__(self, current_stream, sid, iq_id, activateCb, finishedCb, proxy=False): + """Init the Client Factory + @param current_stream: current streams data + @param sid: Session ID + @param iq_id: iq id used to initiate the stream + @param activateCb: method to call to activate the stream + @param finishedCb: method to call when the stream session is finished + @param proxy: True if we are connecting throught a proxy (and we are a requester)""" self.data = current_stream[sid] self.sid = sid self.iq_id = iq_id self.activateCb = activateCb self.finishedCb = finishedCb + self.proxy = proxy def startedConnecting(self, connector): debug (_("Socks 5 client connection started")) @@ -432,6 +444,13 @@ <param name="Port" value="28915" type="string" /> </category> </general> + <individual> + <category name="File Transfer"> + <param name="Proxy" value="" type="string" /> + <param name="Proxy host" value="" type="string" /> + <param name="Proxy port" value="" type="string" /> + </category> + </individual> </params> """ @@ -503,19 +522,6 @@ else: failure_cb(sid, file_obj, NS_BS, failure_reason) - def setData(self, data, id): - self.data = data - self.transfer_id = id - - def sendFile(self, id, filepath, size): - #lauching socks5 requester - debug(_("Launching socks5 requester")) - self.server_factory.protocol.mode = "requester" - self.server_factory.protocol.filepath = filepath - self.server_factory.protocol.filesize = size - self.server_factory.protocol.transfer_id = id - - def startStream(self, file_obj, to_jid, sid, length, successCb, failureCb, size = None, profile='@NONE@'): """Launch the stream workflow @param file_obj: file_obj to send @@ -529,9 +535,14 @@ error(_('stream length not managed yet')) return; profile_jid, xmlstream = self.host.getJidNStream(profile) + if not profile_jid or not xmlstream: + error(_("Unknown profile, this should not happen")) + return; data = self.current_stream[sid] = {} + data["profile"] = profile data["timer"] = reactor.callLater(TIMEOUT, self._timeOut, sid) data["file_obj"] = file_obj + data["from"] = profile_jid data["to"] = to_jid data["success_cb"] = successCb data["failure_cb"] = failureCb @@ -547,10 +558,18 @@ query_elt = iq_elt.addElement('query', NS_BS) query_elt['mode'] = 'tcp' query_elt['sid'] = sid + #first streamhost: direct connection streamhost = query_elt.addElement('streamhost') - streamhost['host'] = "127.0.0.1" #self.host.memory.getParamA("IP", "File Transfer") + streamhost['host'] = self.host.memory.getParamA("IP", "File Transfer") streamhost['port'] = self.host.memory.getParamA("Port", "File Transfer") streamhost['jid'] = profile_jid.full() + + #second streamhost: mediated connection, using proxy + streamhost = query_elt.addElement('streamhost') + streamhost['host'] = self.host.memory.getParamA("Proxy host", "File Transfer", profile_key=profile) + streamhost['port'] = self.host.memory.getParamA("Proxy port", "File Transfer", profile_key=profile) + streamhost['jid'] = self.host.memory.getParamA("Proxy", "File Transfer", profile_key=profile) + iq_elt.addCallback(self.iqResult, sid) iq_elt.send() @@ -560,20 +579,60 @@ warning(_("Transfer failed")) return - try: + try: data = self.current_stream[sid] - callback = data["start_transfer_cb"] file_obj = data["file_obj"] timer = data["timer"] + profile = data["profile"] except KeyError: error(_("Internal error, can't do transfer")) return if timer.active(): timer.cancel() + + profile_jid, xmlstream = self.host.getJidNStream(profile) + query_elt = iq_elt.firstChildElement() + streamhost_elts = filter(lambda elt: elt.name == 'streamhost-used', query_elt.elements()) + if not streamhost_elts: + warning(_("No streamhost found in stream query")) + return - callback(file_obj) + streamhost_jid = streamhost_elts[0]['jid'] + if streamhost_jid != profile_jid.full(): + debug(_("A proxy server is used")) + proxy_host = self.host.memory.getParamA("Proxy host", "File Transfer", profile_key=profile) + proxy_port = self.host.memory.getParamA("Proxy port", "File Transfer", profile_key=profile) + proxy_jid = self.host.memory.getParamA("Proxy", "File Transfer", profile_key=profile) + if proxy_jid != streamhost_jid: + warning(_("Proxy jid is not the same as in parameters, this should not happen")) + return + factory = Socks5ClientFactory(self.current_stream, sid, None, self.activateProxyStream, self._killId, True) + reactor.connectTCP(proxy_host, int(proxy_port), factory) + else: + data["start_transfer_cb"](file_obj) #We now activate the stream + def activateProxyStream(self, sid, iq_id, start_transfer_cb): + debug(_("activating stream")) + data = self.current_stream[sid] + profile = data['profile'] + profile_jid, xmlstream = self.host.getJidNStream(profile) + + iq_elt = client.IQ(xmlstream,'set') + iq_elt["from"] = profile_jid.full() + iq_elt["to"] = self.host.memory.getParamA("Proxy", "File Transfer", profile_key=profile) + query_elt = iq_elt.addElement('query', NS_BS) + query_elt['sid'] = sid + query_elt.addElement('activate', content=data['to'].full()) + iq_elt.addCallback(self.proxyResult, sid, start_transfer_cb, data['file_obj']) + iq_elt.send() + + def proxyResult(self, sid, start_transfer_cb, file_obj, iq_elt): + if iq_elt['type'] == 'error': + warning(_("Can't activate the proxy stream")) + return + else: + start_transfer_cb(file_obj) def prepareToReceive(self, from_jid, sid, file_obj, size, success_cb, failure_cb): """Called when a bytestream is imminent @@ -680,9 +739,47 @@ def __init__(self, plugin_parent): self.plugin_parent = plugin_parent self.host = plugin_parent.host + + def _proxyDataResult(self, iq_elt): + """Called with the informations about proxy according to XEP-0065 #4 + Params should be filled with these infos""" + if iq_elt["type"] == "error": + warning(_("Can't determine proxy informations")) + return + query_elt = iq_elt.firstChildElement() + if query_elt.name != "query": + warning(_("Bad answer received from proxy")) + return + streamhost_elts = filter(lambda elt: elt.name == 'streamhost', query_elt.elements()) + if not streamhost_elts: + warning(_("No streamhost found in stream query")) + return + if len(streamhost_elts) != 1: + warning(_("Multiple streamhost elements in proxy not managed, keeping only the first one")) + streamhost_elt = streamhost_elts[0] + proxy = self.host.memory.setParam("Proxy", streamhost_elt.getAttribute("jid",""), "File Transfer", self.parent.profile) + proxy = self.host.memory.setParam("Proxy host", streamhost_elt.getAttribute("host",""), "File Transfer", self.parent.profile) + proxy = self.host.memory.setParam("Proxy port", streamhost_elt.getAttribute("port",""), "File Transfer", self.parent.profile) + def connectionInitialized(self): + def after_init(ignore): + proxy_ent = self.host.memory.getServerServiceEntity("proxy", "bytestreams", self.parent.profile) + if not proxy_ent: + debug(_("No proxy found on this server")) + return + iq_elt = client.IQ(self.parent.xmlstream,'get') + iq_elt["to"] = proxy_ent.full() + query_elt = iq_elt.addElement('query', NS_BS) + iq_elt.addCallback(self._proxyDataResult) + iq_elt.send() + + self.xmlstream.addObserver(BS_REQUEST, self.plugin_parent.streamQuery, profile = self.parent.profile) + proxy = self.host.memory.getParamA("Proxy", "File Transfer", profile_key = self.parent.profile) + if not proxy: + self.parent.client_initialized.addCallback(after_init) + def getDiscoInfo(self, requestor, target, nodeIdentifier=''):