Mercurial > libervia-backend
diff src/plugins/plugin_xep_0065.py @ 538:2c4016921403
core, frontends, bridgen plugins: fixed methods which were unproperly managing multi-profiles
- added profile argument to askConfirmation, actionResult, actionResultExt, entityDataUpdated, confirmationAnswer, getProgress
- core, frontends: fixed calls/signals according to new bridge API
- user of proper profile namespace for progression indicators and dialogs
- memory: getParam* now return bool when param type is bool
- memory: added getStringParam* to return string instead of typed value
- core, memory, storage, quick_frontend: getHistory now manage properly multi-profiles
- plugins XEP-0047, XEP-0054, XEP-0065, XEP-0077, XEP-0096; multi-profiles proper handling
author | Goffi <goffi@goffi.org> |
---|---|
date | Sat, 10 Nov 2012 16:38:16 +0100 |
parents | a31abb97310d |
children | ca13633d3b6b |
line wrap: on
line diff
--- a/src/plugins/plugin_xep_0065.py Sun Nov 04 23:53:26 2012 +0100 +++ b/src/plugins/plugin_xep_0065.py Sat Nov 10 16:38:16 2012 +0100 @@ -58,13 +58,13 @@ from logging import debug, info, warning, error from twisted.internet import protocol, reactor from twisted.internet import error as jab_error -from twisted.words.protocols.jabber import client, jid +from twisted.words.protocols.jabber import jid, client as jabber_client from twisted.protocols.basic import FileSender from twisted.words.xish import domish from twisted.web.client import getPage +from sat.core.exceptions import ProfileNotInCacheError import struct -import urllib -import hashlib, pdb +import hashlib from zope.interface import implements @@ -298,10 +298,10 @@ if self.factory.proxy: self.state = STATE_READY - self.factory.activateCb(self.sid, self.factory.iq_id, self.startTransfer) + self.factory.activateCb(self.sid, self.factory.iq_id, self.startTransfer, self.profile) else: self.state = STATE_TARGET_READY - self.factory.activateCb(self.sid, self.factory.iq_id) + self.factory.activateCb(self.sid, self.factory.iq_id, self.profile) except struct.error, why: return None @@ -311,6 +311,7 @@ if isinstance(self.factory, Socks5ClientFactory): self.sid = self.factory.sid + self.profile = self.factory.profile self.data = self.factory.data self.state = STATE_TARGET_INITIAL self._startNegotiation() @@ -318,13 +319,16 @@ def connectRequested(self, addr, port): debug("connectRequested") - # Check that this session if expected + # Check that this session is expected if not self.factory.hash_sid_map.has_key(addr): #no: we refuse it - self.sendErrorReply(socks5.REPLY_CONN_REFUSED) + self.sendErrorReply(REPLY_CONN_REFUSED) return - self.sid = self.factory.hash_sid_map[addr] - self.factory.current_stream[self.sid]["start_transfer_cb"] = self.startTransfer + self.sid, self.profile = self.factory.hash_sid_map[addr] + client = self.factory.host.getClient(self.profile) + if not client: + raise ProfileNotInCacheError + client.xep_0065_current_stream[self.sid]["start_transfer_cb"] = self.startTransfer self.connectCompleted(addr, 0) self.transport.stopReading() @@ -336,7 +340,7 @@ def fileTransfered(self, d): info(_("File transfer completed, closing connection")) self.transport.loseConnection() - self.factory.finishedCb(self.sid, True) + self.factory.finishedCb(self.sid, True, self.profile) def connectCompleted(self, remotehost, remoteport): debug("connectCompleted") @@ -395,8 +399,8 @@ class Socks5ServerFactory(protocol.ServerFactory): protocol = SOCKSv5 - def __init__(self, current_stream, hash_sid_map, finishedCb): - self.current_stream = current_stream + def __init__(self, host, hash_sid_map, finishedCb): + self.host = host self.hash_sid_map = hash_sid_map self.finishedCb = finishedCb @@ -409,27 +413,30 @@ class Socks5ClientFactory(protocol.ClientFactory): protocol = SOCKSv5 - def __init__(self, current_stream, sid, iq_id, activateCb, finishedCb, proxy=False): + def __init__(self, current_stream, sid, iq_id, activateCb, finishedCb, proxy=False, profile=None): """Init the Client Factory @param current_stream: current streams data @param sid: Session ID @param iq_id: iq id used to initiate the stream @param activateCb: method to call to activate the stream @param finishedCb: method to call when the stream session is finished - @param proxy: True if we are connecting throught a proxy (and we are a requester)""" + @param proxy: True if we are connecting throught a proxy (and we are a requester) + @param profile: %(doc_profile)s""" + assert(profile) self.data = current_stream[sid] self.sid = sid self.iq_id = iq_id self.activateCb = activateCb self.finishedCb = finishedCb self.proxy = proxy + self.profile = profile def startedConnecting(self, connector): debug (_("Socks 5 client connection started")) def clientConnectionLost(self, connector, reason): - debug (_("Socks 5 client connection lost")) - self.finishedCb(self.sid, reason.type == jab_error.ConnectionDone) #TODO: really check if the state is actually successful + debug (_("Socks 5 client connection lost (reason: %s)"), reason) + self.finishedCb(self.sid, reason.type == jab_error.ConnectionDone, self.profile) #TODO: really check if the state is actually successful class XEP_0065(): @@ -458,12 +465,11 @@ info(_("Plugin XEP_0065 initialization")) #session data - self.current_stream = {} #key: stream_id, value: data(dict) - self.hash_sid_map = {} #key: hash of the transfer session, value: session id + self.hash_sid_map = {} #key: hash of the transfer session, value: (session id, profile) self.host = host debug(_("registering")) - self.server_factory = Socks5ServerFactory(self.current_stream, self.hash_sid_map, self._killId) + self.server_factory = Socks5ServerFactory(host, self.hash_sid_map, lambda sid, success, profile: self._killId(sid, success, profile=profile)) #parameters host.memory.importParams(XEP_0065.params) @@ -476,53 +482,69 @@ def getHandler(self, profile): return XEP_0065_handler(self) + def profileConnected(self, profile): + client = self.host.getClient(profile) + if not client: + raise ProfileNotInCacheError + client.xep_0065_current_stream = {} #key: stream_id, value: data(dict) + def getExternalIP(self): """Return IP visible from outside, by asking to a website""" return getPage("http://www.goffi.org/sat_tools/get_ip.php") - def getProgress(self, sid, data): + def getProgress(self, sid, data, profile): """Fill data with position of current transfer""" + client = self.host.getClient(profile) + if not client: + raise ProfileNotInCacheError try: - file_obj = self.current_stream[sid]["file_obj"] + file_obj = client.xep_0065_current_stream[sid]["file_obj"] data["position"] = str(file_obj.tell()) - data["size"] = str(self.current_stream[sid]["size"]) + data["size"] = str(client.xep_0065_current_stream[sid]["size"]) except: pass - def _timeOut(self, sid): + def _timeOut(self, sid, profile): """Delecte current_stream id, called after timeout - @param id: id of self.current_stream""" - info(_("Socks5 Bytestream: TimeOut reached for id %s") % sid); - self._killId(sid, False, "TIMEOUT") + @param id: id of client.xep_0065_current_stream""" + info(_("Socks5 Bytestream: TimeOut reached for id %s [%s]") % (sid, profile)); + self._killId(sid, False, "TIMEOUT", profile) - def _killId(self, sid, success=False, failure_reason="UNKNOWN"): + def _killId(self, sid, success=False, failure_reason="UNKNOWN", profile=None): """Delete an current_stream id, clean up associated observers - @param sid: id of self.current_stream""" - if not self.current_stream.has_key(sid): + @param sid: id of client.xep_0065_current_stream""" + assert(profile) + client = self.host.getClient(profile) + if not client: + warning(_("Client no more in cache")) + return + if not client.xep_0065_current_stream.has_key(sid): warning(_("kill id called on a non existant id")) return - if self.current_stream[sid].has_key("observer_cb"): - xmlstream = self.current_stream[sid]["xmlstream"] - xmlstream.removeObserver(self.current_stream[sid]["event_data"], self.current_stream[sid]["observer_cb"]) - if self.current_stream[sid]['timer'].active(): - self.current_stream[sid]['timer'].cancel() - if self.current_stream[sid].has_key("size"): - self.host.removeProgressCB(sid) + if client.xep_0065_current_stream[sid].has_key("observer_cb"): + xmlstream = client.xep_0065_current_stream[sid]["xmlstream"] + xmlstream.removeObserver(client.xep_0065_current_stream[sid]["event_data"], client.xep_0065_current_stream[sid]["observer_cb"]) + if client.xep_0065_current_stream[sid]['timer'].active(): + client.xep_0065_current_stream[sid]['timer'].cancel() + if client.xep_0065_current_stream[sid].has_key("size"): + self.host.removeProgressCB(sid, profile) - file_obj = self.current_stream[sid]['file_obj'] - success_cb = self.current_stream[sid]['success_cb'] - failure_cb = self.current_stream[sid]['failure_cb'] + file_obj = client.xep_0065_current_stream[sid]['file_obj'] + success_cb = client.xep_0065_current_stream[sid]['success_cb'] + failure_cb = client.xep_0065_current_stream[sid]['failure_cb'] - del self.current_stream[sid] - if self.hash_sid_map.has_key(sid): - del self.hash_sid_map[sid] + session_hash = client.xep_0065_current_stream[sid].get('hash') + del client.xep_0065_current_stream[sid] + if session_hash in self.hash_sid_map: + #FIXME: check that self.hash_sid_map is correctly cleaned in all cases (timeout, normal flow, etc). + del self.hash_sid_map[session_hash] if success: - success_cb(sid, file_obj, NS_BS) + success_cb(sid, file_obj, NS_BS, profile) else: - failure_cb(sid, file_obj, NS_BS, failure_reason) + failure_cb(sid, file_obj, NS_BS, failure_reason, profile) - def startStream(self, file_obj, to_jid, sid, length, successCb, failureCb, size = None, profile='@NONE@'): + def startStream(self, file_obj, to_jid, sid, length, successCb, failureCb, size = None, profile=None): """Launch the stream workflow @param file_obj: file_obj to send @param to_jid: JID of the recipient @@ -531,16 +553,21 @@ @param successCb: method to call when stream successfuly finished @param failureCb: method to call when something goes wrong @param profile: %(doc_profile)s""" + assert(profile) + client = self.host.getClient(profile) + if not client: + error(_("Unknown profile, this should not happen")) + raise ProfileNotInCacheError + if length != None: error(_('stream length not managed yet')) return; - profile_jid, xmlstream = self.host.getJidNStream(profile) - if not profile_jid or not xmlstream: - error(_("Unknown profile, this should not happen")) - return; - data = self.current_stream[sid] = {} - data["profile"] = profile - data["timer"] = reactor.callLater(TIMEOUT, self._timeOut, sid) + + profile_jid = client.jid + xmlstream = client.xmlstream + + data = client.xep_0065_current_stream[sid] = {} + data["timer"] = reactor.callLater(TIMEOUT, self._timeOut, sid, profile) data["file_obj"] = file_obj data["from"] = profile_jid data["to"] = to_jid @@ -548,11 +575,11 @@ data["failure_cb"] = failureCb data["xmlstream"] = xmlstream data["hash"] = calculateHash(profile_jid, to_jid, sid) - self.hash_sid_map[data["hash"]] = sid + self.hash_sid_map[data["hash"]] = (sid, profile) if size: data["size"] = size - self.host.registerProgressCB(sid, self.getProgress) - iq_elt = client.IQ(xmlstream,'set') + self.host.registerProgressCB(sid, self.getProgress, profile) + iq_elt = jabber_client.IQ(xmlstream,'set') iq_elt["from"] = profile_jid.full() iq_elt["to"] = to_jid.full() query_elt = iq_elt.addElement('query', NS_BS) @@ -570,20 +597,21 @@ streamhost['port'] = self.host.memory.getParamA("Proxy port", "File Transfer", profile_key=profile) streamhost['jid'] = self.host.memory.getParamA("Proxy", "File Transfer", profile_key=profile) - iq_elt.addCallback(self.iqResult, sid) + iq_elt.addCallback(self.iqResult, sid, profile) iq_elt.send() - def iqResult(self, sid, iq_elt): + def iqResult(self, sid, profile, iq_elt): """Called when the result of open iq is received""" if iq_elt["type"] == "error": warning(_("Transfer failed")) return - + client = self.host.getClient(profile) + if not client: + raise ProfileNotInCacheError try: - data = self.current_stream[sid] + data = client.xep_0065_current_stream[sid] file_obj = data["file_obj"] timer = data["timer"] - profile = data["profile"] except KeyError: error(_("Internal error, can't do transfer")) return @@ -607,15 +635,17 @@ if proxy_jid != streamhost_jid: warning(_("Proxy jid is not the same as in parameters, this should not happen")) return - factory = Socks5ClientFactory(self.current_stream, sid, None, self.activateProxyStream, self._killId, True) + factory = Socks5ClientFactory(client.xep_0065_current_stream, sid, None, self.activateProxyStream, lambda sid, success, profile: self._killId(sid, success, profile=profile), True, profile) reactor.connectTCP(proxy_host, int(proxy_port), factory) else: data["start_transfer_cb"](file_obj) #We now activate the stream - def activateProxyStream(self, sid, iq_id, start_transfer_cb): + def activateProxyStream(self, sid, iq_id, start_transfer_cb, profile): debug(_("activating stream")) - data = self.current_stream[sid] - profile = data['profile'] + client = self.host.getClient(profile) + if not client: + raise ProfileNotInCacheError + data = client.xep_0065_current_stream[sid] profile_jid, xmlstream = self.host.getJidNStream(profile) iq_elt = client.IQ(xmlstream,'set') @@ -634,22 +664,26 @@ else: start_transfer_cb(file_obj) - def prepareToReceive(self, from_jid, sid, file_obj, size, success_cb, failure_cb): + def prepareToReceive(self, from_jid, sid, file_obj, size, success_cb, failure_cb, profile): """Called when a bytestream is imminent @param from_jid: jid of the sender @param sid: Stream id @param file_obj: File object where data will be written @param size: full size of the data, or None if unknown @param success_cb: method to call when successfuly finished - @param failure_cb: method to call when something goes wrong""" - data = self.current_stream[sid] = {} + @param failure_cb: method to call when something goes wrong + @param profile: %(doc_profile)s""" + client = self.host.getClient(profile) + if not client: + raise ProfileNotInCacheError + data = client.xep_0065_current_stream[sid] = {} data["from"] = from_jid data["file_obj"] = file_obj data["seq"] = -1 if size: data["size"] = size - self.host.registerProgressCB(sid, self.getProgress) - data["timer"] = reactor.callLater(TIMEOUT, self._timeOut, sid) + self.host.registerProgressCB(sid, self.getProgress, profile) + data["timer"] = reactor.callLater(TIMEOUT, self._timeOut, sid, profile) data["success_cb"] = success_cb data["failure_cb"] = failure_cb @@ -657,20 +691,26 @@ def streamQuery(self, iq_elt, profile): """Get file using byte stream""" debug(_("BS stream query")) - profile_jid, xmlstream = self.host.getJidNStream(profile) + client = self.host.getClient(profile) + + if not client: + raise ProfileNotInCacheError + + xmlstream = client.xmlstream + iq_elt.handled = True query_elt = iq_elt.firstChildElement() sid = query_elt.getAttribute("sid") streamhost_elts = filter(lambda elt: elt.name == 'streamhost', query_elt.elements()) - if not sid in self.current_stream: + if not sid in client.xep_0065_current_stream: warning(_("Ignoring unexpected BS transfer: %s" % sid)) self.sendNotAcceptableError(iq_elt['id'], iq_elt['from'], xmlstream) return - self.current_stream[sid]['timer'].cancel() - self.current_stream[sid]["to"] = jid.JID(iq_elt["to"]) - self.current_stream[sid]["xmlstream"] = xmlstream + client.xep_0065_current_stream[sid]['timer'].cancel() + client.xep_0065_current_stream[sid]["to"] = jid.JID(iq_elt["to"]) + client.xep_0065_current_stream[sid]["xmlstream"] = xmlstream if not streamhost_elts: warning(_("No streamhost found in stream query %s" % sid)) @@ -686,16 +726,19 @@ self.sendBadRequestError(iq_elt['id'], iq_elt['from'], xmlstream) return - self.current_stream[sid]["streamhost"] = (sh_host, sh_port, sh_jid) + client.xep_0065_current_stream[sid]["streamhost"] = (sh_host, sh_port, sh_jid) info (_("Stream proposed: host=[%(host)s] port=[%(port)s]") % {'host':sh_host, 'port':sh_port}) - factory = Socks5ClientFactory(self.current_stream, sid, iq_elt["id"], self.activateStream, self._killId) + factory = Socks5ClientFactory(client.xep_0065_current_stream, sid, iq_elt["id"], self.activateStream, lambda sid, success, profile: self._killId(sid, success, profile=profile), profile=profile) reactor.connectTCP(sh_host, int(sh_port), factory) - def activateStream(self, sid, iq_id): + def activateStream(self, sid, iq_id, profile): + client = self.host.getClient(profile) + if not client: + raise ProfileNotInCacheError debug(_("activating stream")) result = domish.Element((None, 'iq')) - data = self.current_stream[sid] + data = client.xep_0065_current_stream[sid] result['type'] = 'result' result['id'] = iq_id result['from'] = data["to"].full() @@ -769,7 +812,7 @@ if not proxy_ent: debug(_("No proxy found on this server")) return - iq_elt = client.IQ(self.parent.xmlstream,'get') + iq_elt = jabber_client.IQ(self.parent.xmlstream,'get') iq_elt["to"] = proxy_ent.full() query_elt = iq_elt.addElement('query', NS_BS) iq_elt.addCallback(self._proxyDataResult)