Mercurial > libervia-backend
diff src/plugins/plugin_xep_0047.py @ 538:2c4016921403
core, frontends, bridgen plugins: fixed methods which were unproperly managing multi-profiles
- added profile argument to askConfirmation, actionResult, actionResultExt, entityDataUpdated, confirmationAnswer, getProgress
- core, frontends: fixed calls/signals according to new bridge API
- user of proper profile namespace for progression indicators and dialogs
- memory: getParam* now return bool when param type is bool
- memory: added getStringParam* to return string instead of typed value
- core, memory, storage, quick_frontend: getHistory now manage properly multi-profiles
- plugins XEP-0047, XEP-0054, XEP-0065, XEP-0077, XEP-0096; multi-profiles proper handling
author | Goffi <goffi@goffi.org> |
---|---|
date | Sat, 10 Nov 2012 16:38:16 +0100 |
parents | a31abb97310d |
children | ca13633d3b6b |
line wrap: on
line diff
--- a/src/plugins/plugin_xep_0047.py Sun Nov 04 23:53:26 2012 +0100 +++ b/src/plugins/plugin_xep_0047.py Sat Nov 10 16:38:16 2012 +0100 @@ -20,11 +20,11 @@ """ from logging import debug, info, warning, error -from twisted.words.protocols.jabber import client, jid -from twisted.words.protocols.jabber import error as jab_error +from twisted.words.protocols.jabber import client as jabber_client, jid from twisted.words.xish import domish import twisted.internet.error from twisted.internet import reactor +from sat.core.exceptions import ProfileNotInCacheError from wokkel import disco, iwokkel @@ -63,66 +63,82 @@ def __init__(self, host): info(_("In-Band Bytestreams plugin initialization")) self.host = host - self.current_stream = {} #key: stream_id, value: data(dict) def getHandler(self, profile): return XEP_0047_handler(self) - def _timeOut(self, sid): + def profileConnected(self, profile): + client = self.host.getClient(profile) + if not client: + raise ProfileNotInCacheError + client.xep_0047_current_stream = {} #key: stream_id, value: data(dict) + + def _timeOut(self, sid, profile): """Delecte current_stream id, called after timeout - @param id: id of self.current_stream""" - info(_("In-Band Bytestream: TimeOut reached for id %s") % sid); - self._killId(sid, False, "TIMEOUT") + @param id: id of client.xep_0047_current_stream""" + info(_("In-Band Bytestream: TimeOut reached for id %s [%s]") % (sid, profile)); + self._killId(sid, False, "TIMEOUT", profile) - def _killId(self, sid, success=False, failure_reason="UNKNOWN"): + def _killId(self, sid, success=False, failure_reason="UNKNOWN", profile=None): """Delete an current_stream id, clean up associated observers - @param sid: id of self.current_stream""" - if not self.current_stream.has_key(sid): + @param sid: id of client.xep_0047_current_stream""" + assert(profile) + client = self.host.getClient(profile) + if not client: + warning(_("Client no more in cache")) + return + if not client.xep_0047_current_stream.has_key(sid): warning(_("kill id called on a non existant id")) return - if self.current_stream[sid].has_key("observer_cb"): - xmlstream = self.current_stream[sid]["xmlstream"] - xmlstream.removeObserver(self.current_stream[sid]["event_data"], self.current_stream[sid]["observer_cb"]) - if self.current_stream[sid]['timer'].active(): - self.current_stream[sid]['timer'].cancel() - if self.current_stream[sid].has_key("size"): - self.host.removeProgressCB(sid) + if client.xep_0047_current_stream[sid].has_key("observer_cb"): + client.xmlstream.removeObserver(client.xep_0047_current_stream[sid]["event_data"], client.xep_0047_current_stream[sid]["observer_cb"]) + if client.xep_0047_current_stream[sid]['timer'].active(): + client.xep_0047_current_stream[sid]['timer'].cancel() + if client.xep_0047_current_stream[sid].has_key("size"): + self.host.removeProgressCB(sid, profile) - file_obj = self.current_stream[sid]['file_obj'] - success_cb = self.current_stream[sid]['success_cb'] - failure_cb = self.current_stream[sid]['failure_cb'] + file_obj = client.xep_0047_current_stream[sid]['file_obj'] + success_cb = client.xep_0047_current_stream[sid]['success_cb'] + failure_cb = client.xep_0047_current_stream[sid]['failure_cb'] - del self.current_stream[sid] + del client.xep_0047_current_stream[sid] if success: - success_cb(sid, file_obj, NS_IBB) + success_cb(sid, file_obj, NS_IBB, profile) else: - failure_cb(sid, file_obj, NS_IBB, failure_reason) + failure_cb(sid, file_obj, NS_IBB, failure_reason, profile) - def getProgress(self, sid, data): + def getProgress(self, sid, data, profile): """Fill data with position of current transfer""" + client = self.host.getClient(profile) + if not client: + raise ProfileNotInCacheError try: - file_obj = self.current_stream[sid]["file_obj"] + file_obj = client.xep_0047_current_stream[sid]["file_obj"] data["position"] = str(file_obj.tell()) - data["size"] = str(self.current_stream[sid]["size"]) + data["size"] = str(client.xep_0047_current_stream[sid]["size"]) except: pass - def prepareToReceive(self, from_jid, sid, file_obj, size, success_cb, failure_cb): + def prepareToReceive(self, from_jid, sid, file_obj, size, success_cb, failure_cb, profile): """Called when a bytestream is imminent @param from_jid: jid of the sender @param sid: Stream id @param file_obj: File object where data will be written @param size: full size of the data, or None if unknown @param success_cb: method to call when successfuly finished - @param failure_cb: method to call when something goes wrong""" - data = self.current_stream[sid] = {} + @param failure_cb: method to call when something goes wrong + @param profile: %(doc_profile)s""" + client = self.host.getClient(profile) + if not client: + raise ProfileNotInCacheError + data = client.xep_0047_current_stream[sid] = {} data["from"] = from_jid data["file_obj"] = file_obj data["seq"] = -1 if size: data["size"] = size - self.host.registerProgressCB(sid, self.getProgress) + self.host.registerProgressCB(sid, self.getProgress, profile) data["timer"] = reactor.callLater(TIMEOUT, self._timeOut, sid) data["success_cb"] = success_cb data["failure_cb"] = failure_cb @@ -130,47 +146,51 @@ def streamOpening(self, IQ, profile): debug(_("IBB stream opening")) IQ.handled=True - profile_jid, xmlstream = self.host.getJidNStream(profile) + client = self.host.getClient(profile) + if not client: + raise ProfileNotInCacheError open_elt = IQ.firstChildElement() block_size = open_elt.getAttribute('block-size') sid = open_elt.getAttribute('sid') stanza = open_elt.getAttribute('stanza', 'iq') if not sid or not block_size or int(block_size)>65535: warning(_("malformed IBB transfer: %s" % IQ['id'])) - self.sendNotAcceptableError(IQ['id'], IQ['from'], xmlstream) + self.sendNotAcceptableError(IQ['id'], IQ['from'], client.xmlstream) return - if not sid in self.current_stream: + if not sid in client.xep_0047_current_stream: warning(_("Ignoring unexpected IBB transfer: %s" % sid)) - self.sendNotAcceptableError(IQ['id'], IQ['from'], xmlstream) + self.sendNotAcceptableError(IQ['id'], IQ['from'], client.xmlstream) return - if self.current_stream[sid]["from"] != jid.JID(IQ['from']): + if client.xep_0047_current_stream[sid]["from"] != jid.JID(IQ['from']): warning(_("sended jid inconsistency (man in the middle attack attempt ?)")) - self.sendNotAcceptableError(IQ['id'], IQ['from'], xmlstream) - self._killId(sid, False, "PROTOCOL_ERROR") + self.sendNotAcceptableError(IQ['id'], IQ['from'], client.xmlstream) + self._killId(sid, False, "PROTOCOL_ERROR", profile=profile) return #at this stage, the session looks ok and will be accepted #we reset the timeout: - self.current_stream[sid]["timer"].reset(TIMEOUT) + client.xep_0047_current_stream[sid]["timer"].reset(TIMEOUT) #we save the xmlstream, events and observer data to allow observer removal - self.current_stream[sid]["xmlstream"] = xmlstream - self.current_stream[sid]["event_data"] = event_data = (IBB_MESSAGE_DATA if stanza=='message' else IBB_IQ_DATA) % sid - self.current_stream[sid]["observer_cb"] = observer_cb = self.messageData if stanza=='message' else self.iqData + client.xep_0047_current_stream[sid]["event_data"] = event_data = (IBB_MESSAGE_DATA if stanza=='message' else IBB_IQ_DATA) % sid + client.xep_0047_current_stream[sid]["observer_cb"] = observer_cb = self.messageData if stanza=='message' else self.iqData event_close = IBB_CLOSE % sid #we now set the stream observer to look after data packet - xmlstream.addObserver(event_data, observer_cb, profile = profile) - xmlstream.addOnetimeObserver(event_close, self.streamClosing, profile = profile) + client.xmlstream.addObserver(event_data, observer_cb, profile = profile) + client.xmlstream.addOnetimeObserver(event_close, self.streamClosing, profile = profile) #finally, we send the accept stanza result = domish.Element((None, 'iq')) result['type'] = 'result' result['id'] = IQ['id'] result['to'] = IQ['from'] - xmlstream.send(result) + client.xmlstream.send(result) def streamClosing(self, IQ, profile): IQ.handled=True + client = self.host.getClient(profile) + if not client: + raise ProfileNotInCacheError debug(_("IBB stream closing")) data_elt = IQ.firstChildElement() sid = data_elt.getAttribute('sid') @@ -178,55 +198,60 @@ result['type'] = 'result' result['id'] = IQ['id'] result['to'] = IQ['from'] - self.current_stream[sid]["xmlstream"].send(result) - self._killId(sid, success=True) + client.xmlstream.send(result) + self._killId(sid, success=True, profile=profile) def iqData(self, IQ, profile): IQ.handled=True + client = self.host.getClient(profile) + if not client: + raise ProfileNotInCacheError data_elt = IQ.firstChildElement() - if self._manageDataElt(data_elt, 'iq', IQ['id'], jid.JID(IQ['from'])): + if self._manageDataElt(data_elt, 'iq', IQ['id'], jid.JID(IQ['from']), profile): #and send a success answer result = domish.Element((None, 'iq')) result['type'] = 'result' result['id'] = IQ['id'] result['to'] = IQ['from'] - _jid, xmlstream = self.host.getJidNStream(profile) - xmlstream.send(result) + + client.xmlstream.send(result) def messageData(self, message_elt, profile): data_elt = message_elt.firstChildElement() sid = message_elt.getAttribute('id','') - self._manageDataElt(message_elt, 'message', sid, jid.JID(message_elt['from'])) + self._manageDataElt(message_elt, 'message', sid, jid.JID(message_elt['from']), profile) - def _manageDataElt(self, data_elt, stanza, sid, stanza_from_jid): + def _manageDataElt(self, data_elt, stanza, sid, stanza_from_jid, profile): """Manage the data elelement (check validity and write to the file_obj) @param data_elt: "data" domish element @return: True if success""" + client = self.host.getClient(profile) + if not client: + raise ProfileNotInCacheError sid = data_elt.getAttribute('sid') - if sid not in self.current_stream: + if sid not in client.xep_0047_current_stream: error(_("Received data for an unknown session id")) return False - xmlstream = self.current_stream[sid]["xmlstream"] - from_jid = self.current_stream[sid]["from"] - file_obj = self.current_stream[sid]["file_obj"] + from_jid = client.xep_0047_current_stream[sid]["from"] + file_obj = client.xep_0047_current_stream[sid]["file_obj"] if stanza_from_jid != from_jid: warning(_("sended jid inconsistency (man in the middle attack attempt ?)")) if stanza=='iq': - self.sendNotAcceptableError(sid, from_jid, xmlstream) + self.sendNotAcceptableError(sid, from_jid, client.xmlstream) return False - self.current_stream[sid]["seq"]+=1 - if int(data_elt.getAttribute("seq",-1)) != self.current_stream[sid]["seq"]: + client.xep_0047_current_stream[sid]["seq"]+=1 + if int(data_elt.getAttribute("seq",-1)) != client.xep_0047_current_stream[sid]["seq"]: warning(_("Sequence error")) if stanza=='iq': - self.sendNotAcceptableError(IQ["id"], from_jid, xmlstream) + self.sendNotAcceptableError(data_elt["id"], from_jid, client.xmlstream) return False #we reset the timeout: - self.current_stream[sid]["timer"].reset(TIMEOUT) + client.xep_0047_current_stream[sid]["timer"].reset(TIMEOUT) #we can now decode the data try: @@ -235,7 +260,7 @@ #The base64 data is invalid warning(_("Invalid base64 data")) if stanza=='iq': - self.sendNotAcceptableError(IQ["id"], from_jid, xmlstream) + self.sendNotAcceptableError(data_elt["id"], from_jid, client.xmlstream) return False return True @@ -253,7 +278,7 @@ error_el.addElement(('urn:ietf:params:xml:ns:xmpp-stanzas','not-acceptable')) xmlstream.send(result) - def startStream(self, file_obj, to_jid, sid, length, successCb, failureCb, size = None, profile='@NONE@'): + def startStream(self, file_obj, to_jid, sid, length, successCb, failureCb, size = None, profile=None): """Launch the stream workflow @param file_obj: file_obj to send @param to_jid: JID of the recipient @@ -262,34 +287,38 @@ @param successCb: method to call when stream successfuly finished @param failureCb: method to call when something goes wrong @param profile: %(doc_profile)s""" + client = self.host.getClient(profile) + if not client: + raise ProfileNotInCacheError if length != None: error(_('stream length not managed yet')) return; - profile_jid, xmlstream = self.host.getJidNStream(profile) - data = self.current_stream[sid] = {} + data = client.xep_0047_current_stream[sid] = {} data["timer"] = reactor.callLater(TIMEOUT, self._timeOut, sid) data["file_obj"] = file_obj data["to"] = to_jid data["success_cb"] = successCb data["failure_cb"] = failureCb - data["xmlstream"] = xmlstream data["block_size"] = BLOCK_SIZE if size: data["size"] = size - self.host.registerProgressCB(sid, self.getProgress) - iq_elt = client.IQ(xmlstream,'set') - iq_elt['from'] = profile_jid.full() + self.host.registerProgressCB(sid, self.getProgress, profile) + iq_elt = jabber_client.IQ(client.xmlstream,'set') + iq_elt['from'] = client.jid.full() iq_elt['to'] = to_jid.full() open_elt = iq_elt.addElement('open',NS_IBB) open_elt['block-size'] = str(BLOCK_SIZE) open_elt['sid'] = sid open_elt['stanza'] = 'iq' - iq_elt.addCallback(self.iqResult, sid, 0, length) + iq_elt.addCallback(self.iqResult, sid, 0, length, profile) iq_elt.send() - def iqResult(self, sid, seq, length, iq_elt): + def iqResult(self, sid, seq, length, profile, iq_elt): """Called when the result of open iq is received""" - data = self.current_stream[sid] + client = self.host.getClient(profile) + if not client: + raise ProfileNotInCacheError + data = client.xep_0047_current_stream[sid] if iq_elt["type"] == "error": warning(_("Transfer failed")) self.terminateStream(sid, "IQ_ERROR") @@ -300,18 +329,18 @@ buffer = data["file_obj"].read(data["block_size"]) if buffer: - next_iq_elt = client.IQ(data["xmlstream"],'set') + next_iq_elt = jabber_client.IQ(client.xmlstream,'set') next_iq_elt['to'] = data["to"].full() data_elt = next_iq_elt.addElement('data', NS_IBB) data_elt['seq'] = str(seq) data_elt['sid'] = sid data_elt.addContent(base64.b64encode(buffer)) - next_iq_elt.addCallback(self.iqResult, sid, seq+1, length) + next_iq_elt.addCallback(self.iqResult, sid, seq+1, length, profile) next_iq_elt.send() else: - self.terminateStream(sid) + self.terminateStream(sid, profile=profile) - def terminateStream(self, sid, failure_reason = None): + def terminateStream(self, sid, failure_reason = None, profile=None): """Terminate the stream session @param to_jid: recipient @param sid: Session id @@ -320,17 +349,20 @@ @param progress_cb: True if we have to remove the progress callback @param callback: method to call after finishing @param failure_reason: reason of the failure, or None if steam was successful""" - data = self.current_stream[sid] - iq_elt = client.IQ(data["xmlstream"],'set') + client = self.host.getClient(profile) + if not client: + raise ProfileNotInCacheError + data = client.xep_0047_current_stream[sid] + iq_elt = jabber_client.IQ(client.xmlstream,'set') iq_elt['to'] = data["to"].full() close_elt = iq_elt.addElement('close',NS_IBB) close_elt['sid'] = sid iq_elt.send() - self.host.removeProgressCB(sid) + self.host.removeProgressCB(sid, profile) if failure_reason: - self._killId(sid, False, failure_reason) + self._killId(sid, False, failure_reason, profile=profile) else: - self._killId(sid, True) + self._killId(sid, True, profile=profile) class XEP_0047_handler(XMPPHandler): implements(iwokkel.IDisco)