Mercurial > libervia-backend
changeset 398:cb0285372818
File transfer:
- proxy managed in XEP-0065 (Socks5 bytestream)
- bug: fixed a bad id used during stream negociation
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 05 Oct 2011 16:49:57 +0200 |
parents | ccfd69d090c3 |
children | 3ed53803b3b3 |
files | src/plugins/plugin_xep_0065.py src/plugins/plugin_xep_0095.py src/plugins/plugin_xep_0096.py src/tools/memory.py |
diffstat | 4 files changed, 138 insertions(+), 42 deletions(-) [+] |
line wrap: on
line diff
--- a/src/plugins/plugin_xep_0065.py Wed Oct 05 16:48:25 2011 +0200 +++ b/src/plugins/plugin_xep_0065.py Wed Oct 05 16:49:57 2011 +0200 @@ -296,8 +296,12 @@ self.loseConnection() return - self.state = STATE_TARGET_READY - self.factory.activateCb(self.sid, self.factory.iq_id) + if self.factory.proxy: + self.state = STATE_READY + self.factory.activateCb(self.sid, self.factory.iq_id, self.startTransfer) + else: + self.state = STATE_TARGET_READY + self.factory.activateCb(self.sid, self.factory.iq_id) except struct.error, why: return None @@ -405,12 +409,20 @@ class Socks5ClientFactory(protocol.ClientFactory): protocol = SOCKSv5 - def __init__(self, current_stream, sid, iq_id, activateCb, finishedCb): + def __init__(self, current_stream, sid, iq_id, activateCb, finishedCb, proxy=False): + """Init the Client Factory + @param current_stream: current streams data + @param sid: Session ID + @param iq_id: iq id used to initiate the stream + @param activateCb: method to call to activate the stream + @param finishedCb: method to call when the stream session is finished + @param proxy: True if we are connecting throught a proxy (and we are a requester)""" self.data = current_stream[sid] self.sid = sid self.iq_id = iq_id self.activateCb = activateCb self.finishedCb = finishedCb + self.proxy = proxy def startedConnecting(self, connector): debug (_("Socks 5 client connection started")) @@ -432,6 +444,13 @@ <param name="Port" value="28915" type="string" /> </category> </general> + <individual> + <category name="File Transfer"> + <param name="Proxy" value="" type="string" /> + <param name="Proxy host" value="" type="string" /> + <param name="Proxy port" value="" type="string" /> + </category> + </individual> </params> """ @@ -503,19 +522,6 @@ else: failure_cb(sid, file_obj, NS_BS, failure_reason) - def setData(self, data, id): - self.data = data - self.transfer_id = id - - def sendFile(self, id, filepath, size): - #lauching socks5 requester - debug(_("Launching socks5 requester")) - self.server_factory.protocol.mode = "requester" - self.server_factory.protocol.filepath = filepath - self.server_factory.protocol.filesize = size - self.server_factory.protocol.transfer_id = id - - def startStream(self, file_obj, to_jid, sid, length, successCb, failureCb, size = None, profile='@NONE@'): """Launch the stream workflow @param file_obj: file_obj to send @@ -529,9 +535,14 @@ error(_('stream length not managed yet')) return; profile_jid, xmlstream = self.host.getJidNStream(profile) + if not profile_jid or not xmlstream: + error(_("Unknown profile, this should not happen")) + return; data = self.current_stream[sid] = {} + data["profile"] = profile data["timer"] = reactor.callLater(TIMEOUT, self._timeOut, sid) data["file_obj"] = file_obj + data["from"] = profile_jid data["to"] = to_jid data["success_cb"] = successCb data["failure_cb"] = failureCb @@ -547,10 +558,18 @@ query_elt = iq_elt.addElement('query', NS_BS) query_elt['mode'] = 'tcp' query_elt['sid'] = sid + #first streamhost: direct connection streamhost = query_elt.addElement('streamhost') - streamhost['host'] = "127.0.0.1" #self.host.memory.getParamA("IP", "File Transfer") + streamhost['host'] = self.host.memory.getParamA("IP", "File Transfer") streamhost['port'] = self.host.memory.getParamA("Port", "File Transfer") streamhost['jid'] = profile_jid.full() + + #second streamhost: mediated connection, using proxy + streamhost = query_elt.addElement('streamhost') + streamhost['host'] = self.host.memory.getParamA("Proxy host", "File Transfer", profile_key=profile) + streamhost['port'] = self.host.memory.getParamA("Proxy port", "File Transfer", profile_key=profile) + streamhost['jid'] = self.host.memory.getParamA("Proxy", "File Transfer", profile_key=profile) + iq_elt.addCallback(self.iqResult, sid) iq_elt.send() @@ -560,20 +579,60 @@ warning(_("Transfer failed")) return - try: + try: data = self.current_stream[sid] - callback = data["start_transfer_cb"] file_obj = data["file_obj"] timer = data["timer"] + profile = data["profile"] except KeyError: error(_("Internal error, can't do transfer")) return if timer.active(): timer.cancel() + + profile_jid, xmlstream = self.host.getJidNStream(profile) + query_elt = iq_elt.firstChildElement() + streamhost_elts = filter(lambda elt: elt.name == 'streamhost-used', query_elt.elements()) + if not streamhost_elts: + warning(_("No streamhost found in stream query")) + return - callback(file_obj) + streamhost_jid = streamhost_elts[0]['jid'] + if streamhost_jid != profile_jid.full(): + debug(_("A proxy server is used")) + proxy_host = self.host.memory.getParamA("Proxy host", "File Transfer", profile_key=profile) + proxy_port = self.host.memory.getParamA("Proxy port", "File Transfer", profile_key=profile) + proxy_jid = self.host.memory.getParamA("Proxy", "File Transfer", profile_key=profile) + if proxy_jid != streamhost_jid: + warning(_("Proxy jid is not the same as in parameters, this should not happen")) + return + factory = Socks5ClientFactory(self.current_stream, sid, None, self.activateProxyStream, self._killId, True) + reactor.connectTCP(proxy_host, int(proxy_port), factory) + else: + data["start_transfer_cb"](file_obj) #We now activate the stream + def activateProxyStream(self, sid, iq_id, start_transfer_cb): + debug(_("activating stream")) + data = self.current_stream[sid] + profile = data['profile'] + profile_jid, xmlstream = self.host.getJidNStream(profile) + + iq_elt = client.IQ(xmlstream,'set') + iq_elt["from"] = profile_jid.full() + iq_elt["to"] = self.host.memory.getParamA("Proxy", "File Transfer", profile_key=profile) + query_elt = iq_elt.addElement('query', NS_BS) + query_elt['sid'] = sid + query_elt.addElement('activate', content=data['to'].full()) + iq_elt.addCallback(self.proxyResult, sid, start_transfer_cb, data['file_obj']) + iq_elt.send() + + def proxyResult(self, sid, start_transfer_cb, file_obj, iq_elt): + if iq_elt['type'] == 'error': + warning(_("Can't activate the proxy stream")) + return + else: + start_transfer_cb(file_obj) def prepareToReceive(self, from_jid, sid, file_obj, size, success_cb, failure_cb): """Called when a bytestream is imminent @@ -680,9 +739,47 @@ def __init__(self, plugin_parent): self.plugin_parent = plugin_parent self.host = plugin_parent.host + + def _proxyDataResult(self, iq_elt): + """Called with the informations about proxy according to XEP-0065 #4 + Params should be filled with these infos""" + if iq_elt["type"] == "error": + warning(_("Can't determine proxy informations")) + return + query_elt = iq_elt.firstChildElement() + if query_elt.name != "query": + warning(_("Bad answer received from proxy")) + return + streamhost_elts = filter(lambda elt: elt.name == 'streamhost', query_elt.elements()) + if not streamhost_elts: + warning(_("No streamhost found in stream query")) + return + if len(streamhost_elts) != 1: + warning(_("Multiple streamhost elements in proxy not managed, keeping only the first one")) + streamhost_elt = streamhost_elts[0] + proxy = self.host.memory.setParam("Proxy", streamhost_elt.getAttribute("jid",""), "File Transfer", self.parent.profile) + proxy = self.host.memory.setParam("Proxy host", streamhost_elt.getAttribute("host",""), "File Transfer", self.parent.profile) + proxy = self.host.memory.setParam("Proxy port", streamhost_elt.getAttribute("port",""), "File Transfer", self.parent.profile) + def connectionInitialized(self): + def after_init(ignore): + proxy_ent = self.host.memory.getServerServiceEntity("proxy", "bytestreams", self.parent.profile) + if not proxy_ent: + debug(_("No proxy found on this server")) + return + iq_elt = client.IQ(self.parent.xmlstream,'get') + iq_elt["to"] = proxy_ent.full() + query_elt = iq_elt.addElement('query', NS_BS) + iq_elt.addCallback(self._proxyDataResult) + iq_elt.send() + + self.xmlstream.addObserver(BS_REQUEST, self.plugin_parent.streamQuery, profile = self.parent.profile) + proxy = self.host.memory.getParamA("Proxy", "File Transfer", profile_key = self.parent.profile) + if not proxy: + self.parent.client_initialized.addCallback(after_init) + def getDiscoInfo(self, requestor, target, nodeIdentifier=''):
--- a/src/plugins/plugin_xep_0095.py Wed Oct 05 16:48:25 2011 +0200 +++ b/src/plugins/plugin_xep_0095.py Wed Oct 05 16:49:57 2011 +0200 @@ -75,13 +75,13 @@ info (_("XEP-0095 Stream initiation")) iq_el.handled=True si_el = iq_el.firstChildElement() - si_id = iq_el.getAttribute('id') + si_id = si_el.getAttribute('id') si_mime_type = iq_el.getAttribute('mime-type', 'application/octet-stream') si_profile = si_el.getAttribute('profile') si_profile_key = si_profile[len(SI_PROFILE_HEADER):] if si_profile.startswith(SI_PROFILE_HEADER) else si_profile if self.si_profiles.has_key(si_profile_key): #We know this SI profile, we call the callback - self.si_profiles[si_profile_key](iq_el['from'], si_id, si_mime_type, si_el, profile) + self.si_profiles[si_profile_key](iq_el['id'], iq_el['from'], si_id, si_mime_type, si_el, profile) else: #We don't know this profile, we send an error self.sendBadProfileError(iq_el['id'], iq_el['from'], profile)
--- a/src/plugins/plugin_xep_0096.py Wed Oct 05 16:48:25 2011 +0200 +++ b/src/plugins/plugin_xep_0096.py Wed Oct 05 16:49:57 2011 +0200 @@ -70,8 +70,9 @@ except KeyError: warning(_("kill id called on a non existant approval id")) - def transferRequest(self, from_jid, si_id, si_mime_type, si_el, profile): + def transferRequest(self, iq_id, from_jid, si_id, si_mime_type, si_el, profile): """Called when a file transfer is requested + @param iq_id: id of the iq request @param from_jid: jid of the sender @param si_id: Stream Initiation session id @param si_mime_type: Mime type of the file (or default "application/octet-stream" if unknown) @@ -102,7 +103,7 @@ can_range = True else: warning(_("No file element found")) - self.host.plugins["XEP-0095"].sendBadRequestError(si_id, from_jid, profile) + self.host.plugins["XEP-0095"].sendBadRequestError(iq_id, from_jid, profile) return if feature_elts: @@ -112,19 +113,19 @@ stream_method = self.host.plugins["XEP-0020"].negociate(feature_el, 'stream-method',self.managed_stream_m) except KeyError: warning(_("No stream method found")) - self.host.plugins["XEP-0095"].sendBadRequestError(si_id, from_jid, profile) + self.host.plugins["XEP-0095"].sendBadRequestError(iq_id, from_jid, profile) return if not stream_method: warning(_("Can't find a valid stream method")) - self.host.plugins["XEP-0095"].sendFailedError(si_id, from_jid, profile) + self.host.plugins["XEP-0095"].sendFailedError(iq_id, from_jid, profile) return else: warning(_("No feature element found")) - self.host.plugins["XEP-0095"].sendBadRequestError(si_id, from_jid, profile) + self.host.plugins["XEP-0095"].sendBadRequestError(iq_id, from_jid, profile) return #if we are here, the transfer can start, we just need user's agreement - data={ "filename":filename, "from":from_jid, "size":file_size, "date":file_date, "hash":file_hash, "desc":file_desc, "can_range": str(can_range) } + data={ "filename":filename, "id": iq_id, "from":from_jid, "size":file_size, "date":file_date, "hash":file_hash, "desc":file_desc, "can_range": str(can_range) } self._waiting_for_approval[si_id] = [data, reactor.callLater(300, self._kill_id, si_id), stream_method, [], profile] self.host.askConfirmation(si_id, "FILE_TRANSFER", data, self.confirmationCB) @@ -137,12 +138,12 @@ @return: File Object""" return open(dest_path, "ab" if can_range else "wb") - def confirmationCB(self, id, accepted, frontend_data): + def confirmationCB(self, sid, accepted, frontend_data): """Called on confirmation answer - @param id: file transfer session id + @param sid: file transfer session id @param accepted: True if file transfer is accepted @param frontend_data: data sent by frontend""" - data, timeout, stream_method, failed_methods, profile = self._waiting_for_approval[id] + data, timeout, stream_method, failed_methods, profile = self._waiting_for_approval[sid] can_range = data['can_range'] == "True" range_offset = 0 if accepted: @@ -152,20 +153,19 @@ dest_path = frontend_data['dest_path'] except KeyError: error(_('dest path not found in frontend_data')) - del(self._waiting_for_approval[id]) + del(self._waiting_for_approval[sid]) return if stream_method == self.host.plugins["XEP-0065"].NAMESPACE: file_obj = self._getFileObject(dest_path, can_range) range_offset = file_obj.tell() - self.host.plugins["XEP-0065"].prepareToReceive(jid.JID(data['from']), id, file_obj, int(data["size"]), self._transferSucceeded, self._transferFailed) - #self.host.plugins["XEP-0065"].setData(data, id) + self.host.plugins["XEP-0065"].prepareToReceive(jid.JID(data['from']), sid, file_obj, int(data["size"]), self._transferSucceeded, self._transferFailed) elif stream_method == self.host.plugins["XEP-0047"].NAMESPACE: file_obj = self._getFileObject(dest_path, can_range) range_offset = file_obj.tell() - self.host.plugins["XEP-0047"].prepareToReceive(jid.JID(data['from']), id, file_obj, int(data["size"]), self._transferSucceeded, self._transferFailed) + self.host.plugins["XEP-0047"].prepareToReceive(jid.JID(data['from']), sid, file_obj, int(data["size"]), self._transferSucceeded, self._transferFailed) else: error(_("Unknown stream method, this should not happen at this stage, cancelling transfer")) - del(self._waiting_for_approval[id]) + del(self._waiting_for_approval[sid]) return #we can send the iq result @@ -177,11 +177,11 @@ range_elt['offset'] = str(range_offset) #TODO: manage range length misc_elts.append(range_elt) - self.host.plugins["XEP-0095"].acceptStream(id, data['from'], feature_elt, misc_elts, profile) + self.host.plugins["XEP-0095"].acceptStream(data["id"], data['from'], feature_elt, misc_elts, profile) else: - debug (_("Transfer [%s] refused"), id) - self.host.plugins["XEP-0095"].sendRejectedError (id, data['from'], profile=profile) - del(self._waiting_for_approval[id]) + debug (_("Transfer [%s] refused"), sid) + self.host.plugins["XEP-0095"].sendRejectedError (data["id"], data['from'], profile=profile) + del(self._waiting_for_approval[sid]) def _transferSucceeded(self, sid, file_obj, stream_method): """Called by the stream method when transfer successfuly finished @@ -289,4 +289,4 @@ def sendFailureCb(self, sid, file_obj, stream_method, reason): file_obj.close() - warning(_('Transfer %(id)s failed with stream method %(s_method)s') % { 'id': sid, s_method: stream_method }) + warning(_('Transfer %(id)s failed with stream method %(s_method)s') % { 'id': sid, "s_method": stream_method })
--- a/src/tools/memory.py Wed Oct 05 16:48:25 2011 +0200 +++ b/src/tools/memory.py Wed Oct 05 16:49:57 2011 +0200 @@ -667,7 +667,6 @@ """Return list of contacts for given profile @param profile_key: profile key @return list of [contact, attr, groups]""" - debug ("Memory getContact OK (%s)", self.contacts) profile = self.getProfileName(profile_key) if not profile: error(_('Asking contacts for a non-existant profile'))