Mercurial > libervia-backend
changeset 538:2c4016921403
core, frontends, bridgen plugins: fixed methods which were unproperly managing multi-profiles
- added profile argument to askConfirmation, actionResult, actionResultExt, entityDataUpdated, confirmationAnswer, getProgress
- core, frontends: fixed calls/signals according to new bridge API
- user of proper profile namespace for progression indicators and dialogs
- memory: getParam* now return bool when param type is bool
- memory: added getStringParam* to return string instead of typed value
- core, memory, storage, quick_frontend: getHistory now manage properly multi-profiles
- plugins XEP-0047, XEP-0054, XEP-0065, XEP-0077, XEP-0096; multi-profiles proper handling
author | Goffi <goffi@goffi.org> |
---|---|
date | Sat, 10 Nov 2012 16:38:16 +0100 |
parents | 28cddc96c4ed |
children | 428fa16363e7 |
files | frontends/src/bridge/DBus.py frontends/src/jp/jp frontends/src/primitivus/progress.py frontends/src/quick_frontend/quick_app.py frontends/src/wix/chat.py frontends/src/wix/main_window.py src/bridge/DBus.py src/bridge/bridge_constructor/bridge_template.ini src/core/exceptions.py src/core/sat_main.py src/core/xmpp.py src/memory/memory.py src/memory/sqlite.py src/plugins/plugin_exp_pipe.py src/plugins/plugin_misc_xmllog.py src/plugins/plugin_xep_0047.py src/plugins/plugin_xep_0054.py src/plugins/plugin_xep_0065.py src/plugins/plugin_xep_0077.py src/plugins/plugin_xep_0096.py |
diffstat | 20 files changed, 570 insertions(+), 400 deletions(-) [+] |
line wrap: on
line diff
--- a/frontends/src/bridge/DBus.py Sun Nov 04 23:53:26 2012 +0100 +++ b/frontends/src/bridge/DBus.py Sat Nov 10 16:38:16 2012 +0100 @@ -75,8 +75,8 @@ def callMenu(self, category, name, menu_type, profile_key): return unicode(self.db_core_iface.callMenu(category, name, menu_type, profile_key)) - def confirmationAnswer(self, id, accepted, data): - return self.db_core_iface.confirmationAnswer(id, accepted, data) + def confirmationAnswer(self, id, accepted, data, profile): + return self.db_core_iface.confirmationAnswer(id, accepted, data, profile) def connect(self, profile_key="@DEFAULT@"): return self.db_core_iface.connect(profile_key) @@ -105,8 +105,8 @@ def getEntityData(self, jid, keys, profile): return self.db_core_iface.getEntityData(jid, keys, profile) - def getHistory(self, from_jid, to_jid, limit, between=True, callback=None, errback=None): - return self.db_core_iface.getHistory(from_jid, to_jid, limit, between, reply_handler=callback, error_handler=lambda err:errback(err._dbus_error_name[len(const_ERROR_PREFIX)+1:])) + def getHistory(self, from_jid, to_jid, limit, between=True, profile="@NONE@", callback=None, errback=None): + return self.db_core_iface.getHistory(from_jid, to_jid, limit, between, profile, reply_handler=callback, error_handler=lambda err:errback(err._dbus_error_name[len(const_ERROR_PREFIX)+1:])) def getLastResource(self, contact_jid, profile_key="@DEFAULT@"): return unicode(self.db_core_iface.getLastResource(contact_jid, profile_key)) @@ -141,8 +141,8 @@ def getProfilesList(self, ): return self.db_core_iface.getProfilesList() - def getProgress(self, id): - return self.db_core_iface.getProgress(id) + def getProgress(self, id, profile): + return self.db_core_iface.getProgress(id, profile) def getVersion(self, ): return unicode(self.db_core_iface.getVersion())
--- a/frontends/src/jp/jp Sun Nov 04 23:53:26 2012 +0100 +++ b/frontends/src/jp/jp Sat Nov 10 16:38:16 2012 +0100 @@ -53,7 +53,6 @@ import os from os.path import abspath, basename, dirname from optparse import OptionParser -import pdb from sat.tools.jid import JID import gobject from sat_frontends.bridge.DBus import DBusBridgeFrontend,BridgeExceptionNoService @@ -79,7 +78,7 @@ print(_(u"Can't connect to SàT backend, are you sure it's launched ?")) import sys sys.exit(1) - self.transfer_id = None + self.transfer_data = None def check_options(self): """Check command line options""" @@ -242,7 +241,7 @@ for file in self.files: if not os.path.exists(file): - error (_("File [%s] doesn't exist !") % file) + error (_(u"File [%s] doesn't exist !") % file) exit(1) if not self.options.bz2 and os.path.isdir(file): error (_("[%s] is a dir ! Please send files inside or use compression") % file) @@ -265,11 +264,11 @@ bz2.close() info(_("OK !")) path = abspath(tmpfile) - self.transfer_id = self.bridge.sendFile(full_dest_jid, path, {}, profile_key=self.profile) + self.transfer_data = self.bridge.sendFile(full_dest_jid, path, {}, profile_key=self.profile) else: for file in self.files: path = abspath(file) - self.transfer_id = self.bridge.sendFile(full_dest_jid, path, {}, profile_key=self.profile) #FIXME: show progress only for last transfer_id + self.transfer_data = self.bridge.sendFile(full_dest_jid, path, {}, profile_key=self.profile) #FIXME: show progress only for last transfer_id def _getFullJid(self, param_jid): @@ -283,8 +282,11 @@ return param_jid - def askConfirmation(self, type, id, data): + def askConfirmation(self, type, confirm_id, data, profile): """CB used for file transfer, accept files depending on parameters""" + if profile != self.profile: + debug("Ask confirmation ignored: not our profile") + return answer_data={} if type == "FILE_TRANSFER": if not self.options.wait_file: @@ -295,11 +297,11 @@ answer_data["dest_path"] = os.getcwd()+'/'+data['filename'] if self.options.force or not os.path.exists(answer_data["dest_path"]): - self.bridge.confirmationAnswer(id, True, answer_data) + self.bridge.confirmationAnswer(confirm_id, True, answer_data, profile) info(_("Accepted file [%(filename)s] from %(sender)s") % {'filename':data['filename'], 'sender':data['from']}) - self.transfer_id = id + self.transfer_data = confirm_id else: - self.bridge.confirmationAnswer(id, False, answer_data) + self.bridge.confirmationAnswer(confirm_id, False, answer_data, profile) warning(_("Refused file [%(filename)s] from %(sender)s: a file with the same name already exist") % {'filename':data['filename'], 'sender':data['from']}) @@ -316,14 +318,14 @@ fifopath = os.path.join(tmp_dir,"pipe_in") answer_data["dest_path"] = fifopath os.mkfifo(fifopath) - self.bridge.confirmationAnswer(id, True, answer_data) + self.bridge.confirmationAnswer(confirm_id, True, answer_data, profile) with open(fifopath, 'r') as f: shutil.copyfileobj(f, sys.stdout) shutil.rmtree(tmp_dir) self.loop.quit() - def actionResult(self, type, id, data): + def actionResult(self, action_type, action_id, data, profile): #FIXME info (_("FIXME: actionResult not implemented")) @@ -332,8 +334,9 @@ self.bridge.register("askConfirmation", self.askConfirmation) def progressCB(self): - if self.transfer_id: - data = self.bridge.getProgress(self.transfer_id) + if self.transfer_data: + transfer_id = self.transfer_data + data = self.bridge.getProgress(transfer_id, self.profile) if data: if not data['position']: data['position'] = '0'
--- a/frontends/src/primitivus/progress.py Sun Nov 04 23:53:26 2012 +0100 +++ b/frontends/src/primitivus/progress.py Sat Nov 10 16:38:16 2012 +0100 @@ -21,7 +21,6 @@ import urwid from urwid_satext import sat_widgets -from sat.tools.jid import JID class Progress(urwid.WidgetWrap): @@ -38,47 +37,48 @@ main_wid = sat_widgets.FocusFrame(listbox, footer=buttons_wid) urwid.WidgetWrap.__init__(self, main_wid) - def addProgress(self, id, message): + def addProgress(self, progress_id, message): + profile = self.host.profile # TODO: manage multiple profiles mess_wid = urwid.Text(message) progr_wid = urwid.ProgressBar('progress_normal', 'progress_complete') column = urwid.Columns([mess_wid, progr_wid]) - self.progress_dict[id] = {'full':column,'progress':progr_wid,'state':'init'} + self.progress_dict[(progress_id, profile)] = {'full':column,'progress':progr_wid,'state':'init'} self.progress_list.append(column) - self.progressCB(self.host.loop, (id, message)) + self.progressCB(self.host.loop, (progress_id, message, profile)) def progressCB(self, loop, data): - id, message = data - data = self.host.bridge.getProgress(id) - pbar = self.progress_dict[id]['progress'] + progress_id, message, profile = data + data = self.host.bridge.getProgress(progress_id, profile) + pbar = self.progress_dict[(progress_id, profile)]['progress'] #FIXME: must manage profiles if data: - if self.progress_dict[id]['state'] == 'init': + if self.progress_dict[(progress_id, profile)]['state'] == 'init': #first answer, we must construct the bar - self.progress_dict[id]['state'] = 'progress' + self.progress_dict[(progress_id, profile)]['state'] = 'progress' pbar.done = float(data['size']) pbar.set_completion(float(data['position'])) self.updateNotBar() else: - if self.progress_dict[id]['state'] == 'progress': - self.progress_dict[id]['state'] = 'done' + if self.progress_dict[(progress_id, profile)]['state'] == 'progress': + self.progress_dict[(progress_id, profile)]['state'] = 'done' pbar.set_completion(pbar.done) self.updateNotBar() return - loop.set_alarm_in(1,self.progressCB, (id, message)) + loop.set_alarm_in(1,self.progressCB, (progress_id, message, profile)) - def __removeBar(self, id): - wid = self.progress_dict[id]['full'] + def __removeBar(self, progress_id, profile): + wid = self.progress_dict[(progress_id, profile)]['full'] self.progress_list.remove(wid) - del(self.progress_dict[id]) + del(self.progress_dict[(progress_id, profile)]) def __onClear(self, button): to_remove = [] - for id in self.progress_dict: - if self.progress_dict[id]['state'] == 'done': - to_remove.append(id) - for id in to_remove: - self.__removeBar(id) + for progress_id, profile in self.progress_dict: + if self.progress_dict[(progress_id, profile)]['state'] == 'done': + to_remove.append((progress_id, profile)) + for progress_id, profile in to_remove: + self.__removeBar(progress_id, profile) self.updateNotBar() def updateNotBar(self): @@ -87,8 +87,8 @@ return progress = 0 nb_bars = 0 - for id in self.progress_dict: - pbar = self.progress_dict[id]['progress'] + for progress_id, profile in self.progress_dict: + pbar = self.progress_dict[(progress_id, profile)]['progress'] progress += pbar.current/pbar.done*100 nb_bars+=1 av_progress = progress/float(nb_bars)
--- a/frontends/src/quick_frontend/quick_app.py Sun Nov 04 23:53:26 2012 +0100 +++ b/frontends/src/quick_frontend/quick_app.py Sat Nov 10 16:38:16 2012 +0100 @@ -114,7 +114,7 @@ self.options.profile = self.options.profile.decode('utf-8') return args - def _getParamError(self): + def _getParamError(self, ignore): error(_("Can't get profile parameter")) def plug_profile(self, profile_key='@DEFAULT@'): @@ -546,7 +546,7 @@ self.contact_list.setCache(jid, 'avatar', filename) self.contact_list.replace(jid) - def askConfirmation(self, type, id, data): + def askConfirmation(self, type, id, data, profile): raise NotImplementedError def actionResult(self, type, id, data):
--- a/frontends/src/wix/chat.py Sun Nov 04 23:53:26 2012 +0100 +++ b/frontends/src/wix/chat.py Sat Nov 10 16:38:16 2012 +0100 @@ -265,7 +265,7 @@ else: full_jid = self.target id = self.host.bridge.sendFile(full_jid, filename, {}, self.host.profile) - self.host.waitProgress(id, _("File Transfer"), _("Copying %s") % os.path.basename(filename)) + self.host.waitProgress(id, _("File Transfer"), _("Copying %s") % os.path.basename(filename), self.host.profile) def onStartTarot(self, e): debug (_("Starting Tarot game"))
--- a/frontends/src/wix/main_window.py Sun Nov 04 23:53:26 2012 +0100 +++ b/frontends/src/wix/main_window.py Sat Nov 10 16:38:16 2012 +0100 @@ -223,11 +223,13 @@ self.tools.Disable() return - def askConfirmation(self, type, id, data): + def askConfirmation(self, confirmation_type, confirmation_id, data, profile): #TODO: refactor this in QuickApp + if not self.check_profile(profile): + return debug (_("Confirmation asked")) answer_data={} - if type == "FILE_TRANSFER": + if confirmation_type == "FILE_TRANSFER": debug (_("File transfer confirmation asked")) dlg = wx.MessageDialog(self, _("The contact %(jid)s wants to send you the file %(filename)s\nDo you accept ?") % {'jid':data["from"], 'filename':data["filename"]}, _('File Request'), @@ -238,16 +240,16 @@ filename = wx.FileSelector(_("Where do you want to save the file ?"), flags = wx.FD_SAVE | wx.FD_OVERWRITE_PROMPT) if filename: answer_data["dest_path"] = filename - self.bridge.confirmationAnswer(id, True, answer_data) - self.waitProgress(id, _("File Transfer"), _("Copying %s") % os.path.basename(filename)) + self.bridge.confirmationAnswer(confirmation_id, True, answer_data, profile) + self.waitProgress(confirmation_id, _("File Transfer"), _("Copying %s") % os.path.basename(filename), profile) else: answer = wx.ID_NO if answer==wx.ID_NO: - self.bridge.confirmationAnswer(id, False, answer_data) + self.bridge.confirmationAnswer(confirmation_id, False, answer_data, profile) dlg.Destroy() - elif type == "YES/NO": + elif confirmation_type == "YES/NO": debug (_("Yes/No confirmation asked")) dlg = wx.MessageDialog(self, data["message"], _('Confirmation'), @@ -255,13 +257,15 @@ ) answer=dlg.ShowModal() if answer==wx.ID_YES: - self.bridge.confirmationAnswer(id, True, {}) + self.bridge.confirmationAnswer(confirmation_id, True, {}, profile) if answer==wx.ID_NO: - self.bridge.confirmationAnswer(id, False, {}) + self.bridge.confirmationAnswer(confirmation_id, False, {}, profile) dlg.Destroy() - def actionResult(self, type, id, data): + def actionResult(self, type, id, data, profile): + if not self.check_profile(profile): + return debug (_("actionResult: type = [%(type)s] id = [%(id)s] data = [%(data)s]") % {'type':type, 'id':id, 'data':data}) if not id in self.current_action_ids: debug (_('unknown id, ignoring')) @@ -313,8 +317,8 @@ - def progressCB(self, id, title, message): - data = self.bridge.getProgress(id) + def progressCB(self, progress_id, title, message, profile): + data = self.bridge.getProgress(progress_id, profile) if data: if not self.pbar: #first answer, we must construct the bar @@ -327,11 +331,11 @@ self.pbar.Update(self.pbar.finish_value) return - wx.CallLater(10, self.progressCB, id, title, message) + wx.CallLater(10, self.progressCB, progress_id, title, message, profile) - def waitProgress (self, id, title, message): + def waitProgress (self, progress_id, title, message, profile): self.pbar = None - wx.CallLater(10, self.progressCB, id, title, message) + wx.CallLater(10, self.progressCB, progress_id, title, message, profile)
--- a/src/bridge/DBus.py Sun Nov 04 23:53:26 2012 +0100 +++ b/src/bridge/DBus.py Sat Nov 10 16:38:16 2012 +0100 @@ -105,18 +105,18 @@ pass @dbus.service.signal(const_INT_PREFIX+const_CORE_SUFFIX, - signature='ssa{ss}') - def actionResult(self, answer_type, id, data): + signature='ssa{ss}s') + def actionResult(self, answer_type, id, data, profile): pass @dbus.service.signal(const_INT_PREFIX+const_CORE_SUFFIX, - signature='ssa{sa{ss}}') - def actionResultExt(self, answer_type, id, data): + signature='ssa{sa{ss}}s') + def actionResultExt(self, answer_type, id, data, profile): pass @dbus.service.signal(const_INT_PREFIX+const_CORE_SUFFIX, - signature='ssa{ss}') - def askConfirmation(self, conf_type, id, data): + signature='ssa{ss}s') + def askConfirmation(self, conf_type, id, data, profile): pass @dbus.service.signal(const_INT_PREFIX+const_CORE_SUFFIX, @@ -208,10 +208,10 @@ return self._callback("callMenu", unicode(category), unicode(name), unicode(menu_type), unicode(profile_key)) @dbus.service.method(const_INT_PREFIX+const_CORE_SUFFIX, - in_signature='sba{ss}', out_signature='', + in_signature='sba{ss}s', out_signature='', async_callbacks=None) - def confirmationAnswer(self, id, accepted, data): - return self._callback("confirmationAnswer", unicode(id), accepted, data) + def confirmationAnswer(self, id, accepted, data, profile): + return self._callback("confirmationAnswer", unicode(id), accepted, data, unicode(profile)) @dbus.service.method(const_INT_PREFIX+const_CORE_SUFFIX, in_signature='s', out_signature='', @@ -268,10 +268,10 @@ return self._callback("getEntityData", unicode(jid), keys, unicode(profile)) @dbus.service.method(const_INT_PREFIX+const_CORE_SUFFIX, - in_signature='ssib', out_signature='a(dssss)', + in_signature='ssibs', out_signature='a(dssss)', async_callbacks=('callback', 'errback')) - def getHistory(self, from_jid, to_jid, limit, between=True, callback=None, errback=None): - return self._callback("getHistory", unicode(from_jid), unicode(to_jid), limit, between, callback=callback, errback=errback) + def getHistory(self, from_jid, to_jid, limit, between=True, profile="@NONE@", callback=None, errback=None): + return self._callback("getHistory", unicode(from_jid), unicode(to_jid), limit, between, unicode(profile), callback=callback, errback=errback) @dbus.service.method(const_INT_PREFIX+const_CORE_SUFFIX, in_signature='ss', out_signature='s', @@ -340,10 +340,10 @@ return self._callback("getProfilesList", ) @dbus.service.method(const_INT_PREFIX+const_CORE_SUFFIX, - in_signature='s', out_signature='a{ss}', + in_signature='ss', out_signature='a{ss}', async_callbacks=None) - def getProgress(self, id): - return self._callback("getProgress", unicode(id)) + def getProgress(self, id, profile): + return self._callback("getProgress", unicode(id), unicode(profile)) @dbus.service.method(const_INT_PREFIX+const_CORE_SUFFIX, in_signature='', out_signature='s', @@ -503,14 +503,14 @@ self.dbus_name = dbus.service.BusName(const_INT_PREFIX, self.session_bus) self.dbus_bridge = DbusObject(self.session_bus, const_OBJ_PATH) - def actionResult(self, answer_type, id, data): - self.dbus_bridge.actionResult(answer_type, id, data) + def actionResult(self, answer_type, id, data, profile): + self.dbus_bridge.actionResult(answer_type, id, data, profile) - def actionResultExt(self, answer_type, id, data): - self.dbus_bridge.actionResultExt(answer_type, id, data) + def actionResultExt(self, answer_type, id, data, profile): + self.dbus_bridge.actionResultExt(answer_type, id, data, profile) - def askConfirmation(self, conf_type, id, data): - self.dbus_bridge.askConfirmation(conf_type, id, data) + def askConfirmation(self, conf_type, id, data, profile): + self.dbus_bridge.askConfirmation(conf_type, id, data, profile) def connected(self, profile): self.dbus_bridge.connected(profile)
--- a/src/bridge/bridge_constructor/bridge_template.ini Sun Nov 04 23:53:26 2012 +0100 +++ b/src/bridge/bridge_constructor/bridge_template.ini Sat Nov 10 16:38:16 2012 +0100 @@ -106,18 +106,19 @@ [askConfirmation] type=signal category=core -sig_in=ssa{ss} +sig_in=ssa{ss}s doc=A confirmation is needed for an action doc_param_0=conf_type: Type of the confirmation, can be: - YES/NO: A question which need a yes or no answer - FILE_TRANSFER: A confirmation is needed before transfering a file doc_param_1=id: Id of the confirmation query doc_param_2=data: conf_type dependent data +doc_param_3=%(doc_profile)s [actionResult] type=signal category=core -sig_in=ssa{ss} +sig_in=ssa{ss}s doc=Requested result of an action doc_param_0=answer_type: Type of the answer, can be: - SUPPRESS: The action is managed, the id MUST be removed from queue @@ -126,16 +127,18 @@ - RESULT: General result, interpretation depend of the action doc_param_1=id: Id of the action doc_param_2=data: answer_type specific data +doc_param_3=%(doc_profile)s [actionResultExt] type=signal category=core -sig_in=ssa{sa{ss}} +sig_in=ssa{sa{ss}}s doc=Requested result of an action (Extended) doc_param_0=answer_type: Same as for [actionResult] but with the following additional one: - DICT_DICT: As RESULT, but returned as a dictionary of dictionary doc_param_1=id: Id of the action doc_param_2=data: answer_type specific data +doc_param_3=%(doc_profile)s [entityDataUpdated] type=signal @@ -462,14 +465,16 @@ async= type=method category=core -sig_in=ssib +sig_in=ssibs sig_out=a(dssss) param_3_default=True +param_4_default="@NONE@" doc=Get history of a communication between two entities doc_param_0=from_jid: source JID (bare jid for catch all, full jid else) doc_param_1=to_jid: dest JID (bare jid for catch all, full jid else) doc_param_2=limit: max number of history elements to get (0 for the whole history) doc_param_3=between: True if we want history between the two jids (in both direction), False if we only want messages from from_jid to to_jid +doc_param_4=%(doc_profile)s doc_return=Ordered list (by timestamp) of tuples (timestamp, full from_jid, full to_jid, message, type) [addContact] @@ -519,20 +524,22 @@ [confirmationAnswer] type=method category=core -sig_in=sba{ss} +sig_in=sba{ss}s sig_out= doc=Give answer to a confirmation request doc_param_0=id: id of the confirmation request doc_param_1=accepted: True if the action is confirmed doc_param_2=data: action specific data +doc_param_3=%(doc_profile)s [getProgress] type=method category=core -sig_in=s +sig_in=ss sig_out=a{ss} doc=Get progress information for an action doc_param_0=id: id of the progression status +doc_param_1=%(doc_profile)s doc_return=dict with progress information: - position: current position - size: end position
--- a/src/core/exceptions.py Sun Nov 04 23:53:26 2012 +0100 +++ b/src/core/exceptions.py Sat Nov 10 16:38:16 2012 +0100 @@ -33,3 +33,6 @@ class UnknownGroupError(Exception): pass + +class NotFound(Exception): + pass
--- a/src/core/sat_main.py Sun Nov 04 23:53:26 2012 +0100 +++ b/src/core/sat_main.py Sat Nov 10 16:38:16 2012 +0100 @@ -44,7 +44,7 @@ import os.path from sat.core import xmpp -from sat.core.exceptions import ProfileUnknownError, UnknownEntityError +from sat.core.exceptions import ProfileUnknownError, UnknownEntityError, ProfileNotInCacheError from sat.memory.memory import Memory from sat.tools.xml_tools import tupleList2dataForm from sat.tools.misc import TriggerManager @@ -98,8 +98,6 @@ def __init__(self): #TODO: standardize callback system - self.__waiting_conf = {} #callback called when a confirmation is received - self.__progress_cb_map = {} #callback called when a progress is requested (key = progress id) self.__general_cb_map = {} #callback called for general reasons (key = name) self.__private_data = {} #used for internal callbacks (key = id) self.profiles = {} @@ -134,8 +132,8 @@ self.bridge.register("sendMessage", self.sendMessage) self.bridge.register("getConfig", self.memory.getConfig) self.bridge.register("setParam", self.setParam) - self.bridge.register("getParamA", self.memory.getParamA) - self.bridge.register("asyncGetParamA", self.memory.asyncGetParamA) + self.bridge.register("getParamA", self.memory.getStringParamA) + self.bridge.register("asyncGetParamA", self.memory.asyncGetStringParamA) self.bridge.register("getParamsUI", self.memory.getParamsUI) self.bridge.register("getParams", self.memory.getParams) self.bridge.register("getParamsForCategory", self.memory.getParamsForCategory) @@ -336,11 +334,13 @@ return None return self.profiles[profile] - def registerNewAccount(self, login, password, email, server, port = 5222, id = None): + def registerNewAccount(self, login, password, email, server, port = 5222, id = None, profile_key = '@DEFAULT@'): """Connect to a server and create a new account using in-band registration""" + profile = self.memory.getProfileName(profile_key) + assert(profile) next_id = id or sat_next_id() #the id is used to send server's answer - serverRegistrer = xmlstream.XmlStreamFactory(xmpp.RegisteringAuthenticator(self, server, login, password, email, next_id)) + serverRegistrer = xmlstream.XmlStreamFactory(xmpp.RegisteringAuthenticator(self, server, login, password, email, next_id, profile)) connector = reactor.connectTCP(server, port, serverRegistrer) serverRegistrer.clientConnectionLost = lambda conn, reason: connector.disconnect() @@ -354,7 +354,7 @@ if not user or not password or not server: info (_('No user or server given')) #TODO: a proper error message must be sent to frontend - self.actionResult(id, "ERROR", {'message':_("No user, password or server given, can't register new account.")}) + self.actionResult(id, "ERROR", {'message':_("No user, password or server given, can't register new account.")}, profile) return confirm_id = sat_next_id() @@ -362,12 +362,12 @@ self.askConfirmation(confirm_id, "YES/NO", {"message":_("Are you sure to register new account [%(user)s] to server %(server)s ?") % {'user':user, 'server':server, 'profile':profile}}, - self.regisConfirmCB) + self.regisConfirmCB, profile) print ("===============+++++++++++ REGISTER NEW ACCOUNT++++++++++++++============") print "id=",id print "data=",data - def regisConfirmCB(self, id, accepted, data): + def regisConfirmCB(self, id, accepted, data, profile): print _("register Confirmation CB ! (%s)") % str(accepted) action_id,profile = self.__private_data[id] del self.__private_data[id] @@ -377,7 +377,7 @@ server = self.memory.getParamA("Server", "Connection", profile_key=profile) self.registerNewAccount(user, password, None, server, id=action_id) else: - self.actionResult(action_id, "SUPPRESS", {}) + self.actionResult(action_id, "SUPPRESS", {}, profile) def submitForm(self, action, target, fields, profile_key): """submit a form @@ -601,73 +601,88 @@ ## Generic HMI ## - def actionResult(self, id, type, data): + def actionResult(self, action_id, action_type, data, profile): """Send the result of an action - @param id: same id used with action - @param type: result type ("PARAM", "SUCCESS", "ERROR", "XMLUI") + @param action_id: same action_id used with action + @param action_type: result action_type ("PARAM", "SUCCESS", "ERROR", "XMLUI") @param data: dictionary """ - self.bridge.actionResult(type, id, data) + self.bridge.actionResult(action_type, action_id, data, profile) - def actionResultExt(self, id, type, data): + def actionResultExt(self, action_id, action_type, data, profile): """Send the result of an action, extended version - @param id: same id used with action - @param type: result type /!\ only "DICT_DICT" for this method + @param action_id: same action_id used with action + @param action_type: result action_type /!\ only "DICT_DICT" for this method @param data: dictionary of dictionaries """ - if type != "DICT_DICT": - error(_("type for actionResultExt must be DICT_DICT, fixing it")) - type = "DICT_DICT" - self.bridge.actionResultExt(type, id, data) + if action_type != "DICT_DICT": + error(_("action_type for actionResultExt must be DICT_DICT, fixing it")) + action_type = "DICT_DICT" + self.bridge.actionResultExt(action_type, action_id, data, profile) - def askConfirmation(self, id, type, data, cb): + def askConfirmation(self, conf_id, conf_type, data, cb, profile): """Add a confirmation callback - @param id: id used to get answer - @param type: confirmation type ("YES/NO", "FILE_TRANSFER") - @param data: data (depend of confirmation type) + @param conf_id: conf_id used to get answer + @param conf_type: confirmation conf_type ("YES/NO", "FILE_TRANSFER") + @param data: data (depend of confirmation conf_type) @param cb: callback called with the answer """ - if self.__waiting_conf.has_key(id): + client = self.getClient(profile) + if not client: + raise ProfileUnknownError(_("Asking confirmation a non-existant profile")) + if client._waiting_conf.has_key(conf_id): error (_("Attempt to register two callbacks for the same confirmation")) else: - self.__waiting_conf[id] = cb - self.bridge.askConfirmation(type, id, data) + client._waiting_conf[conf_id] = cb + self.bridge.askConfirmation(conf_type, conf_id, data, profile) - def confirmationAnswer(self, id, accepted, data): + def confirmationAnswer(self, conf_id, accepted, data, profile): """Called by frontends to answer confirmation requests""" - debug (_("Received confirmation answer for id [%(id)s]: %(success)s") % {'id': id, 'success':_("accepted") if accepted else _("refused")}) - if not self.__waiting_conf.has_key(id): + client = self.getClient(profile) + if not client: + raise ProfileUnknownError(_("Confirmation answer from a non-existant profile")) + debug (_("Received confirmation answer for conf_id [%(conf_id)s]: %(success)s") % {'conf_id': conf_id, 'success':_("accepted") if accepted else _("refused")}) + if not client._waiting_conf.has_key(conf_id): error (_("Received an unknown confirmation")) else: - cb = self.__waiting_conf[id] - del self.__waiting_conf[id] - cb(id, accepted, data) + cb = client._waiting_conf[conf_id] + del client._waiting_conf[conf_id] + cb(conf_id, accepted, data, profile) - def registerProgressCB(self, id, CB): + def registerProgressCB(self, progress_id, CB, profile): """Register a callback called when progress is requested for id""" - self.__progress_cb_map[id] = CB + client = self.getClient(profile) + if not client: + raise ProfileUnknownError + client._progress_cb_map[progress_id] = CB - def removeProgressCB(self, id): + def removeProgressCB(self, progress_id, profile): """Remove a progress callback""" - if not self.__progress_cb_map.has_key(id): + client = self.getClient(profile) + if not client: + raise ProfileUnknownError + if not client._progress_cb_map.has_key(progress_id): error (_("Trying to remove an unknow progress callback")) else: - del self.__progress_cb_map[id] + del client._progress_cb_map[progress_id] - def getProgress(self, id): + def getProgress(self, progress_id, profile): """Return a dict with progress information data['position'] : current possition data['size'] : end_position """ + client = self.getClient(profile) + if not profile: + raise ProfileNotInCacheError data = {} try: - self.__progress_cb_map[id](id, data) + client._progress_cb_map[progress_id](progress_id, data, profile) except KeyError: pass - #debug("Requested progress for unknown id") + #debug("Requested progress for unknown progress_id") return data def registerGeneralCB(self, name, CB):
--- a/src/core/xmpp.py Sun Nov 04 23:53:26 2012 +0100 +++ b/src/core/xmpp.py Sat Nov 10 16:38:16 2012 +0100 @@ -37,6 +37,9 @@ self.host_app = host_app self.client_initialized = defer.Deferred() self.conn_deferred = defer.Deferred() + self._waiting_conf = {} #callback called when a confirmation is received + self._progress_cb_map = {} #callback called when a progress is requested (key = progress id) + def getConnectionDeferred(self): """Return a deferred which fire when the client is connected""" @@ -360,7 +363,7 @@ class RegisteringAuthenticator(xmlstream.ConnectAuthenticator): - def __init__(self, host, jabber_host, user_login, user_pass, email, answer_id): + def __init__(self, host, jabber_host, user_login, user_pass, email, answer_id, profile): xmlstream.ConnectAuthenticator.__init__(self, jabber_host) self.host = host self.jabber_host = jabber_host @@ -368,6 +371,7 @@ self.user_pass = user_pass self.user_email = email self.answer_id = answer_id + self.profile = profile print _("Registration asked for"),user_login, user_pass, jabber_host def connectionMade(self): @@ -392,7 +396,7 @@ debug (_("registration answer: %s") % answer.toXml()) answer_type = "SUCCESS" answer_data={"message":_("Registration successfull")} - self.host.bridge.actionResult(answer_type, self.answer_id, answer_data) + self.host.bridge.actionResult(answer_type, self.answer_id, answer_data, self.profile) self.xmlstream.sendFooter() def registrationFailure(self, failure): @@ -405,7 +409,7 @@ else: answer_data['reason'] = 'unknown' answer_data={"message":_("Registration failed (%s)") % str(failure.value.condition)} - self.host.bridge.actionResult(answer_type, self.answer_id, answer_data) + self.host.bridge.actionResult(answer_type, self.answer_id, answer_data, self.profile) self.xmlstream.sendFooter() class SatVersionHandler(generic.VersionHandler):
--- a/src/memory/memory.py Sun Nov 04 23:53:26 2012 +0100 +++ b/src/memory/memory.py Sat Nov 10 16:38:16 2012 +0100 @@ -227,6 +227,28 @@ d.addCallback(self.__default_ok, name, category) d.addErrback(errback or self.__default_ko, name, category) + def __getAttr(self, node, attr, value): + """ get attribute value + @param node: XML param node + @param attr: name of the attribute to get (e.g.: 'value' or 'type') + @param value: user defined value""" + if attr == 'value': + value_to_use = value if value!=None else node.getAttribute(attr) #we use value (user defined) if it exist, else we use node's default value + if node.getAttribute('type') == 'bool': + return value_to_use.lower() not in ('false','0') + return value_to_use + return node.getAttribute(attr) + + def __type_to_string(self, result): + """ convert result to string, according to its type """ + if isinstance(result,bool): + return "true" if result else "false" + return result + + def getStringParamA(self, name, category, attr="value", profile_key="@DEFAULT@"): + """ Same as getParamA but for bridge: convert non string value to string """ + return self.__type_to_string(self.getParamA(name, category, attr, profile_key)) + def getParamA(self, name, category, attr="value", profile_key="@DEFAULT@"): """Helper method to get a specific attribute @param name: name of the parameter @@ -235,31 +257,35 @@ @param profile: owner of the param (@ALL@ for everyone) @return: attribute""" + #FIXME: looks really dirty and buggy, need to be reviewed/refactored node = self.__getParamNode(name, category) if not node: error(_("Requested param [%(name)s] in category [%(category)s] doesn't exist !") % {'name':name, 'category':category}) - return "" + raise exceptions.NotFound if node[0] == 'general': value = self.__getParam(None, category, name, 'general') - return value if value!=None else node[1].getAttribute(attr) + return self.__getAttr(node[1], attr, value) assert(node[0] == 'individual') profile = self.getProfileName(profile_key) if not profile: error(_('Requesting a param for an non-existant profile')) - return "" + raise exceptions.ProfileUnknownError if profile not in self.params: error(_('Requesting synchronous param for not connected profile')) - return "" + raise exceptions.ConnectedProfileError if attr == "value": value = self.__getParam(profile, category, name) - return value if value!=None else node[1].getAttribute(attr) - else: - return node[1].getAttribute(attr) + return self.__getAttr(node[1], attr, value) + + def asyncGetStringParamA(self, name, category, attr="value", profile_key="@DEFAULT@"): + d = self.asyncGetParamA(name, category, attr, profile_key) + d.addCallback(self.__type_to_string) + return d def asyncGetParamA(self, name, category, attr="value", profile_key="@DEFAULT@"): """Helper method to get a specific attribute @@ -274,7 +300,7 @@ if node[0] == 'general': value = self.__getParam(None, category, name, 'general') - return defer.succeed(value if value!=None else node[1].getAttribute(attr)) + return defer.succeed(self.__getAttr(node[1], attr, value)) assert(node[0] == 'individual') @@ -285,21 +311,20 @@ if attr != "value": return defer.succeed(node[1].getAttribute(attr)) - default = node[1].getAttribute(attr) try: value = self.__getParam(profile, category, name) - return defer.succeed(value if value!=None else default) + return defer.succeed(self.__getAttr(node[1], attr, value)) except exceptions.ProfileNotInCacheError: #We have to ask data to the storage manager d = self.storage.getIndParam(category, name, profile) - return d.addCallback(lambda value: value if value!=None else default) + return d.addCallback(lambda value: self.__getAttr(node[1], attr, value)) def __getParam(self, profile, category, name, _type='individual', cache=None): """Return the param, or None if it doesn't exist @param profile: the profile name (not profile key, i.e. name and not something like @DEFAULT@) @param category: param category @param name: param name - @param type: "general" or "individual" + @param _type: "general" or "individual" @param cache: temporary cache, to use when profile is not logged @return: param value or None if it doesn't exist """ @@ -614,8 +639,9 @@ assert(profile!="@NONE@") return self.storage.addToHistory(from_jid, to_jid, message, _type, timestamp, profile) - def getHistory(self, from_jid, to_jid, limit=0, between=True): - return self.storage.getHistory(jid.JID(from_jid), jid.JID(to_jid), limit, between) + def getHistory(self, from_jid, to_jid, limit=0, between=True, profile="@NONE@"): + assert(profile != "@NONE@") + return self.storage.getHistory(jid.JID(from_jid), jid.JID(to_jid), limit, between, profile) def addServerFeature(self, feature, profile): """Add a feature discovered from server @@ -788,11 +814,14 @@ return self.subscriptions[profile] + def getStringParamA(self, name, category, attr="value", profile_key='@DEFAULT@'): + return self.params.getStringParamA(name, category, attr, profile_key) + def getParamA(self, name, category, attr="value", profile_key='@DEFAULT@'): return self.params.getParamA(name, category, attr, profile_key) - def asyncGetParamA(self, name, category, attr="value", profile_key='@DEFAULT@'): - return self.params.asyncGetParamA(name, category, attr, profile_key) + def asyncGetStringParamA(self, name, category, attr="value", profile_key='@DEFAULT@'): + return self.params.asyncGetStringParamA(name, category, attr, profile_key) def getParamsUI(self, profile_key): return self.params.getParamsUI(profile_key)
--- a/src/memory/sqlite.py Sun Nov 04 23:53:26 2012 +0100 +++ b/src/memory/sqlite.py Sat Nov 10 16:38:16 2012 +0100 @@ -183,7 +183,7 @@ @param _type: message type (see RFC 6121 §5.2.2) @param timestamp: timestamp in seconds since epoch, or None to use current time """ - assert(profile!=None) + assert(profile) d = self.dbpool.runQuery("INSERT INTO history(source, source_res, dest, dest_res, timestamp, message, type, profile_id) VALUES (?,?,?,?,?,?,?,?)", (from_jid.userhost(), from_jid.resource, to_jid.userhost(), to_jid.resource, timestamp or time.time(), message, _type, self.profiles[profile])) @@ -191,12 +191,13 @@ {"from_jid":from_jid.full(), "to_jid":to_jid.full(), "message":message}))) return d - def getHistory(self, from_jid, to_jid, limit=0, between=True): + def getHistory(self, from_jid, to_jid, limit=0, between=True, profile=None): """Store a new message in history @param from_jid: source JID (full, or bare for catchall @param to_jid: dest JID (full, or bare for catchall @param size: maximum number of messages to get, or 0 for unlimited """ + assert(profile) def sqliteToDict(query_result): query_result.reverse() result = [] @@ -208,8 +209,8 @@ return result - query_parts = ["SELECT timestamp, source, source_res, dest, dest_res, message, type FROM history WHERE"] - values = [] + query_parts = ["SELECT timestamp, source, source_res, dest, dest_res, message, type FROM history WHERE profile_id=? AND"] + values = [self.profiles[profile]] def test_jid(_type,_jid): values.append(_jid.userhost())
--- a/src/plugins/plugin_exp_pipe.py Sun Nov 04 23:53:26 2012 +0100 +++ b/src/plugins/plugin_exp_pipe.py Sat Nov 10 16:38:16 2012 +0100 @@ -21,16 +21,13 @@ from logging import debug, info, warning, error from twisted.words.xish import domish -from twisted.internet import protocol -from twisted.words.protocols.jabber import client, jid +from twisted.words.protocols.jabber import jid from twisted.words.protocols.jabber import error as jab_error import os, os.path from twisted.internet import reactor -import pdb +from sat.core.exceptions import ProfileNotInCacheError -from zope.interface import implements - -from wokkel import disco, iwokkel, data_form +from wokkel import data_form IQ_SET = '/iq[@type="set"]' PROFILE_NAME = "pipe-transfer" @@ -53,19 +50,23 @@ def __init__(self, host): info(_("Plugin Pipe initialization")) self.host = host - self._waiting_for_approval = {} #key = id, value = [transfer data, IdelayedCall Reactor timeout, - # current stream method, [failed stream methods], profile] self.managed_stream_m = [self.host.plugins["XEP-0065"].NAMESPACE, self.host.plugins["XEP-0047"].NAMESPACE] #Stream methods managed self.host.plugins["XEP-0095"].registerSIProfile(PROFILE_NAME, self.transferRequest) host.bridge.addMethod("pipeOut", ".plugin", in_sign='ssa{ss}s', out_sign='s', method=self.pipeOut) - def _kill_id(self, approval_id): + def profileConnected(self, profile): + client = self.host.getClient(profile) + client._pipe_waiting_for_approval = {} #key = id, value = [transfer data, IdelayedCall Reactor timeout, + # current stream method, [failed stream methods], profile] + + def _kill_id(self, approval_id, profile): """Delete a waiting_for_approval id, called after timeout - @param approval_id: id of _waiting_for_approval""" + @param approval_id: id of _pipe_waiting_for_approval""" info(_("SI Pipe Transfer: TimeOut reached for id %s") % approval_id); try: - del self._waiting_for_approval[approval_id] + client = self.host.getClient(profile) + del client._pipe_waiting_for_approval[approval_id] except KeyError: warning(_("kill id called on a non existant approval id")) @@ -79,6 +80,9 @@ @param profile: %(doc_profile)s""" info (_("EXP-PIPE file transfer requested")) debug(si_el.toXml()) + client = self.host.getClient(profile) + if not client: + raise ProfileNotInCacheError pipe_elts = filter(lambda elt: elt.name == 'pipe', si_el.elements()) feature_elts = self.host.plugins["XEP-0020"].getFeatureElt(si_el) @@ -107,17 +111,20 @@ #if we are here, the transfer can start, we just need user's agreement data={ "id": iq_id, "from":from_jid } - self._waiting_for_approval[si_id] = [data, reactor.callLater(300, self._kill_id, si_id), stream_method, [], profile] + client._pipe_waiting_for_approval[si_id] = [data, reactor.callLater(300, self._kill_id, si_id), stream_method, [], profile] - self.host.askConfirmation(si_id, "PIPE_TRANSFER", data, self.confirmationCB) + self.host.askConfirmation(si_id, "PIPE_TRANSFER", data, self.confirmationCB, profile) - def confirmationCB(self, sid, accepted, frontend_data): + def confirmationCB(self, sid, accepted, frontend_data, profile): """Called on confirmation answer @param sid: file transfer session id @param accepted: True if file transfer is accepted @param frontend_data: data sent by frontend""" - data, timeout, stream_method, failed_methods, profile = self._waiting_for_approval[sid] + client = self.host.getClient(profile) + if not client: + raise ProfileNotInCacheError + data, timeout, stream_method, failed_methods, profile = client._pipe_waiting_for_approval[sid] if accepted: if timeout.active(): timeout.cancel() @@ -125,17 +132,17 @@ dest_path = frontend_data['dest_path'] except KeyError: error(_('dest path not found in frontend_data')) - del(self._waiting_for_approval[sid]) + del(client._pipe_waiting_for_approval[sid]) return if stream_method == self.host.plugins["XEP-0065"].NAMESPACE: file_obj = open(dest_path, 'w+') - self.host.plugins["XEP-0065"].prepareToReceive(jid.JID(data['from']), sid, file_obj, None, self._transferSucceeded, self._transferFailed) + self.host.plugins["XEP-0065"].prepareToReceive(jid.JID(data['from']), sid, file_obj, None, self._transferSucceeded, self._transferFailed, profile) elif stream_method == self.host.plugins["XEP-0047"].NAMESPACE: file_obj = open(dest_path, 'w+') - self.host.plugins["XEP-0047"].prepareToReceive(jid.JID(data['from']), sid, file_obj, None, self._transferSucceeded, self._transferFailed) + self.host.plugins["XEP-0047"].prepareToReceive(jid.JID(data['from']), sid, file_obj, None, self._transferSucceeded, self._transferFailed, profile) else: error(_("Unknown stream method, this should not happen at this stage, cancelling transfer")) - del(self._waiting_for_approval[sid]) + del(client._pipe_waiting_for_approval[sid]) return #we can send the iq result @@ -146,29 +153,35 @@ else: debug (_("Transfer [%s] refused"), sid) self.host.plugins["XEP-0095"].sendRejectedError (data["id"], data['from'], profile=profile) - del(self._waiting_for_approval[sid]) + del(client._pipe_waiting_for_approval[sid]) - def _transferSucceeded(self, sid, file_obj, stream_method): + def _transferSucceeded(self, sid, file_obj, stream_method, profile): """Called by the stream method when transfer successfuly finished @param id: stream id""" + client = self.host.getClient(profile) + if not client: + raise ProfileNotInCacheError file_obj.close() info(_('Transfer %s successfuly finished') % sid) - del(self._waiting_for_approval[sid]) + del(client._pipe_waiting_for_approval[sid]) - def _transferFailed(self, sid, file_obj, stream_method, reason): + def _transferFailed(self, sid, file_obj, stream_method, reason, profile): """Called when something went wrong with the transfer @param id: stream id @param reason: can be TIMEOUT, IO_ERROR, PROTOCOL_ERROR""" - data, timeout, stream_method, failed_methods, profile = self._waiting_for_approval[sid] + client = self.host.getClient(profile) + if not client: + raise ProfileNotInCacheError + data, timeout, stream_method, failed_methods, profile = client._pipe_waiting_for_approval[sid] warning(_('Transfer %(id)s failed with stream method %(s_method)s') % { 'id': sid, 's_method': stream_method }) filepath = file_obj.name file_obj.close() #TODO: session remenber (within a time limit) when a stream method fail, and avoid that stream method with full jid for the rest of the session warning(_("All stream methods failed, can't transfer the file")) - del(self._waiting_for_approval[sid]) + del(client._pipe_waiting_for_approval[sid]) - def pipeCb(self, profile, filepath, sid, IQ): + def pipeCb(self, filepath, sid, profile, IQ): if IQ['type'] == "error": stanza_err = jab_error.exceptionFromStanza(IQ) if stanza_err.code == '403' and stanza_err.condition == 'forbidden': @@ -230,13 +243,14 @@ pipe_transfer_elts.append(pipe_elt) sid, offer = self.host.plugins["XEP-0095"].proposeStream(jid.JID(to_jid), PROFILE, feature_elt, pipe_transfer_elts, profile_key = profile) - offer.addCallback(self.pipeCb, profile, filepath, sid) + offer.addCallback(self.pipeCb, filepath, sid, profile) return sid - def sendSuccessCb(self, sid, file_obj, stream_method): + def sendSuccessCb(self, sid, file_obj, stream_method, profile): info(_('Transfer %s successfuly finished') % sid) file_obj.close() - def sendFailureCb(self, sid, file_obj, stream_method, reason): + def sendFailureCb(self, sid, file_obj, stream_method, reason, profile): file_obj.close() - warning(_('Transfer %(id)s failed with stream method %(s_method)s') % { 'id': sid, "s_method": stream_method }) + warning(_('Transfer %(id)s failed with stream method %(s_method)s %(profile)s') % { 'id': sid, "s_method": stream_method, "profile": profile }) +
--- a/src/plugins/plugin_misc_xmllog.py Sun Nov 04 23:53:26 2012 +0100 +++ b/src/plugins/plugin_misc_xmllog.py Sat Nov 10 16:38:16 2012 +0100 @@ -74,7 +74,7 @@ #bridge host.bridge.addSignal("xmlLog", ".plugin", signature='sss') #args: direction("IN" or "OUT"), xml_data, profile - do_log = bool(self.host.memory.getParamA("Xml log", "Debug")) + do_log = self.host.memory.getParamA("Xml log", "Debug") if do_log: info(_("XML log activated")) host.trigger.add("XML Initialized", self.logXml)
--- a/src/plugins/plugin_xep_0047.py Sun Nov 04 23:53:26 2012 +0100 +++ b/src/plugins/plugin_xep_0047.py Sat Nov 10 16:38:16 2012 +0100 @@ -20,11 +20,11 @@ """ from logging import debug, info, warning, error -from twisted.words.protocols.jabber import client, jid -from twisted.words.protocols.jabber import error as jab_error +from twisted.words.protocols.jabber import client as jabber_client, jid from twisted.words.xish import domish import twisted.internet.error from twisted.internet import reactor +from sat.core.exceptions import ProfileNotInCacheError from wokkel import disco, iwokkel @@ -63,66 +63,82 @@ def __init__(self, host): info(_("In-Band Bytestreams plugin initialization")) self.host = host - self.current_stream = {} #key: stream_id, value: data(dict) def getHandler(self, profile): return XEP_0047_handler(self) - def _timeOut(self, sid): + def profileConnected(self, profile): + client = self.host.getClient(profile) + if not client: + raise ProfileNotInCacheError + client.xep_0047_current_stream = {} #key: stream_id, value: data(dict) + + def _timeOut(self, sid, profile): """Delecte current_stream id, called after timeout - @param id: id of self.current_stream""" - info(_("In-Band Bytestream: TimeOut reached for id %s") % sid); - self._killId(sid, False, "TIMEOUT") + @param id: id of client.xep_0047_current_stream""" + info(_("In-Band Bytestream: TimeOut reached for id %s [%s]") % (sid, profile)); + self._killId(sid, False, "TIMEOUT", profile) - def _killId(self, sid, success=False, failure_reason="UNKNOWN"): + def _killId(self, sid, success=False, failure_reason="UNKNOWN", profile=None): """Delete an current_stream id, clean up associated observers - @param sid: id of self.current_stream""" - if not self.current_stream.has_key(sid): + @param sid: id of client.xep_0047_current_stream""" + assert(profile) + client = self.host.getClient(profile) + if not client: + warning(_("Client no more in cache")) + return + if not client.xep_0047_current_stream.has_key(sid): warning(_("kill id called on a non existant id")) return - if self.current_stream[sid].has_key("observer_cb"): - xmlstream = self.current_stream[sid]["xmlstream"] - xmlstream.removeObserver(self.current_stream[sid]["event_data"], self.current_stream[sid]["observer_cb"]) - if self.current_stream[sid]['timer'].active(): - self.current_stream[sid]['timer'].cancel() - if self.current_stream[sid].has_key("size"): - self.host.removeProgressCB(sid) + if client.xep_0047_current_stream[sid].has_key("observer_cb"): + client.xmlstream.removeObserver(client.xep_0047_current_stream[sid]["event_data"], client.xep_0047_current_stream[sid]["observer_cb"]) + if client.xep_0047_current_stream[sid]['timer'].active(): + client.xep_0047_current_stream[sid]['timer'].cancel() + if client.xep_0047_current_stream[sid].has_key("size"): + self.host.removeProgressCB(sid, profile) - file_obj = self.current_stream[sid]['file_obj'] - success_cb = self.current_stream[sid]['success_cb'] - failure_cb = self.current_stream[sid]['failure_cb'] + file_obj = client.xep_0047_current_stream[sid]['file_obj'] + success_cb = client.xep_0047_current_stream[sid]['success_cb'] + failure_cb = client.xep_0047_current_stream[sid]['failure_cb'] - del self.current_stream[sid] + del client.xep_0047_current_stream[sid] if success: - success_cb(sid, file_obj, NS_IBB) + success_cb(sid, file_obj, NS_IBB, profile) else: - failure_cb(sid, file_obj, NS_IBB, failure_reason) + failure_cb(sid, file_obj, NS_IBB, failure_reason, profile) - def getProgress(self, sid, data): + def getProgress(self, sid, data, profile): """Fill data with position of current transfer""" + client = self.host.getClient(profile) + if not client: + raise ProfileNotInCacheError try: - file_obj = self.current_stream[sid]["file_obj"] + file_obj = client.xep_0047_current_stream[sid]["file_obj"] data["position"] = str(file_obj.tell()) - data["size"] = str(self.current_stream[sid]["size"]) + data["size"] = str(client.xep_0047_current_stream[sid]["size"]) except: pass - def prepareToReceive(self, from_jid, sid, file_obj, size, success_cb, failure_cb): + def prepareToReceive(self, from_jid, sid, file_obj, size, success_cb, failure_cb, profile): """Called when a bytestream is imminent @param from_jid: jid of the sender @param sid: Stream id @param file_obj: File object where data will be written @param size: full size of the data, or None if unknown @param success_cb: method to call when successfuly finished - @param failure_cb: method to call when something goes wrong""" - data = self.current_stream[sid] = {} + @param failure_cb: method to call when something goes wrong + @param profile: %(doc_profile)s""" + client = self.host.getClient(profile) + if not client: + raise ProfileNotInCacheError + data = client.xep_0047_current_stream[sid] = {} data["from"] = from_jid data["file_obj"] = file_obj data["seq"] = -1 if size: data["size"] = size - self.host.registerProgressCB(sid, self.getProgress) + self.host.registerProgressCB(sid, self.getProgress, profile) data["timer"] = reactor.callLater(TIMEOUT, self._timeOut, sid) data["success_cb"] = success_cb data["failure_cb"] = failure_cb @@ -130,47 +146,51 @@ def streamOpening(self, IQ, profile): debug(_("IBB stream opening")) IQ.handled=True - profile_jid, xmlstream = self.host.getJidNStream(profile) + client = self.host.getClient(profile) + if not client: + raise ProfileNotInCacheError open_elt = IQ.firstChildElement() block_size = open_elt.getAttribute('block-size') sid = open_elt.getAttribute('sid') stanza = open_elt.getAttribute('stanza', 'iq') if not sid or not block_size or int(block_size)>65535: warning(_("malformed IBB transfer: %s" % IQ['id'])) - self.sendNotAcceptableError(IQ['id'], IQ['from'], xmlstream) + self.sendNotAcceptableError(IQ['id'], IQ['from'], client.xmlstream) return - if not sid in self.current_stream: + if not sid in client.xep_0047_current_stream: warning(_("Ignoring unexpected IBB transfer: %s" % sid)) - self.sendNotAcceptableError(IQ['id'], IQ['from'], xmlstream) + self.sendNotAcceptableError(IQ['id'], IQ['from'], client.xmlstream) return - if self.current_stream[sid]["from"] != jid.JID(IQ['from']): + if client.xep_0047_current_stream[sid]["from"] != jid.JID(IQ['from']): warning(_("sended jid inconsistency (man in the middle attack attempt ?)")) - self.sendNotAcceptableError(IQ['id'], IQ['from'], xmlstream) - self._killId(sid, False, "PROTOCOL_ERROR") + self.sendNotAcceptableError(IQ['id'], IQ['from'], client.xmlstream) + self._killId(sid, False, "PROTOCOL_ERROR", profile=profile) return #at this stage, the session looks ok and will be accepted #we reset the timeout: - self.current_stream[sid]["timer"].reset(TIMEOUT) + client.xep_0047_current_stream[sid]["timer"].reset(TIMEOUT) #we save the xmlstream, events and observer data to allow observer removal - self.current_stream[sid]["xmlstream"] = xmlstream - self.current_stream[sid]["event_data"] = event_data = (IBB_MESSAGE_DATA if stanza=='message' else IBB_IQ_DATA) % sid - self.current_stream[sid]["observer_cb"] = observer_cb = self.messageData if stanza=='message' else self.iqData + client.xep_0047_current_stream[sid]["event_data"] = event_data = (IBB_MESSAGE_DATA if stanza=='message' else IBB_IQ_DATA) % sid + client.xep_0047_current_stream[sid]["observer_cb"] = observer_cb = self.messageData if stanza=='message' else self.iqData event_close = IBB_CLOSE % sid #we now set the stream observer to look after data packet - xmlstream.addObserver(event_data, observer_cb, profile = profile) - xmlstream.addOnetimeObserver(event_close, self.streamClosing, profile = profile) + client.xmlstream.addObserver(event_data, observer_cb, profile = profile) + client.xmlstream.addOnetimeObserver(event_close, self.streamClosing, profile = profile) #finally, we send the accept stanza result = domish.Element((None, 'iq')) result['type'] = 'result' result['id'] = IQ['id'] result['to'] = IQ['from'] - xmlstream.send(result) + client.xmlstream.send(result) def streamClosing(self, IQ, profile): IQ.handled=True + client = self.host.getClient(profile) + if not client: + raise ProfileNotInCacheError debug(_("IBB stream closing")) data_elt = IQ.firstChildElement() sid = data_elt.getAttribute('sid') @@ -178,55 +198,60 @@ result['type'] = 'result' result['id'] = IQ['id'] result['to'] = IQ['from'] - self.current_stream[sid]["xmlstream"].send(result) - self._killId(sid, success=True) + client.xmlstream.send(result) + self._killId(sid, success=True, profile=profile) def iqData(self, IQ, profile): IQ.handled=True + client = self.host.getClient(profile) + if not client: + raise ProfileNotInCacheError data_elt = IQ.firstChildElement() - if self._manageDataElt(data_elt, 'iq', IQ['id'], jid.JID(IQ['from'])): + if self._manageDataElt(data_elt, 'iq', IQ['id'], jid.JID(IQ['from']), profile): #and send a success answer result = domish.Element((None, 'iq')) result['type'] = 'result' result['id'] = IQ['id'] result['to'] = IQ['from'] - _jid, xmlstream = self.host.getJidNStream(profile) - xmlstream.send(result) + + client.xmlstream.send(result) def messageData(self, message_elt, profile): data_elt = message_elt.firstChildElement() sid = message_elt.getAttribute('id','') - self._manageDataElt(message_elt, 'message', sid, jid.JID(message_elt['from'])) + self._manageDataElt(message_elt, 'message', sid, jid.JID(message_elt['from']), profile) - def _manageDataElt(self, data_elt, stanza, sid, stanza_from_jid): + def _manageDataElt(self, data_elt, stanza, sid, stanza_from_jid, profile): """Manage the data elelement (check validity and write to the file_obj) @param data_elt: "data" domish element @return: True if success""" + client = self.host.getClient(profile) + if not client: + raise ProfileNotInCacheError sid = data_elt.getAttribute('sid') - if sid not in self.current_stream: + if sid not in client.xep_0047_current_stream: error(_("Received data for an unknown session id")) return False - xmlstream = self.current_stream[sid]["xmlstream"] - from_jid = self.current_stream[sid]["from"] - file_obj = self.current_stream[sid]["file_obj"] + from_jid = client.xep_0047_current_stream[sid]["from"] + file_obj = client.xep_0047_current_stream[sid]["file_obj"] if stanza_from_jid != from_jid: warning(_("sended jid inconsistency (man in the middle attack attempt ?)")) if stanza=='iq': - self.sendNotAcceptableError(sid, from_jid, xmlstream) + self.sendNotAcceptableError(sid, from_jid, client.xmlstream) return False - self.current_stream[sid]["seq"]+=1 - if int(data_elt.getAttribute("seq",-1)) != self.current_stream[sid]["seq"]: + client.xep_0047_current_stream[sid]["seq"]+=1 + if int(data_elt.getAttribute("seq",-1)) != client.xep_0047_current_stream[sid]["seq"]: warning(_("Sequence error")) if stanza=='iq': - self.sendNotAcceptableError(IQ["id"], from_jid, xmlstream) + self.sendNotAcceptableError(data_elt["id"], from_jid, client.xmlstream) return False #we reset the timeout: - self.current_stream[sid]["timer"].reset(TIMEOUT) + client.xep_0047_current_stream[sid]["timer"].reset(TIMEOUT) #we can now decode the data try: @@ -235,7 +260,7 @@ #The base64 data is invalid warning(_("Invalid base64 data")) if stanza=='iq': - self.sendNotAcceptableError(IQ["id"], from_jid, xmlstream) + self.sendNotAcceptableError(data_elt["id"], from_jid, client.xmlstream) return False return True @@ -253,7 +278,7 @@ error_el.addElement(('urn:ietf:params:xml:ns:xmpp-stanzas','not-acceptable')) xmlstream.send(result) - def startStream(self, file_obj, to_jid, sid, length, successCb, failureCb, size = None, profile='@NONE@'): + def startStream(self, file_obj, to_jid, sid, length, successCb, failureCb, size = None, profile=None): """Launch the stream workflow @param file_obj: file_obj to send @param to_jid: JID of the recipient @@ -262,34 +287,38 @@ @param successCb: method to call when stream successfuly finished @param failureCb: method to call when something goes wrong @param profile: %(doc_profile)s""" + client = self.host.getClient(profile) + if not client: + raise ProfileNotInCacheError if length != None: error(_('stream length not managed yet')) return; - profile_jid, xmlstream = self.host.getJidNStream(profile) - data = self.current_stream[sid] = {} + data = client.xep_0047_current_stream[sid] = {} data["timer"] = reactor.callLater(TIMEOUT, self._timeOut, sid) data["file_obj"] = file_obj data["to"] = to_jid data["success_cb"] = successCb data["failure_cb"] = failureCb - data["xmlstream"] = xmlstream data["block_size"] = BLOCK_SIZE if size: data["size"] = size - self.host.registerProgressCB(sid, self.getProgress) - iq_elt = client.IQ(xmlstream,'set') - iq_elt['from'] = profile_jid.full() + self.host.registerProgressCB(sid, self.getProgress, profile) + iq_elt = jabber_client.IQ(client.xmlstream,'set') + iq_elt['from'] = client.jid.full() iq_elt['to'] = to_jid.full() open_elt = iq_elt.addElement('open',NS_IBB) open_elt['block-size'] = str(BLOCK_SIZE) open_elt['sid'] = sid open_elt['stanza'] = 'iq' - iq_elt.addCallback(self.iqResult, sid, 0, length) + iq_elt.addCallback(self.iqResult, sid, 0, length, profile) iq_elt.send() - def iqResult(self, sid, seq, length, iq_elt): + def iqResult(self, sid, seq, length, profile, iq_elt): """Called when the result of open iq is received""" - data = self.current_stream[sid] + client = self.host.getClient(profile) + if not client: + raise ProfileNotInCacheError + data = client.xep_0047_current_stream[sid] if iq_elt["type"] == "error": warning(_("Transfer failed")) self.terminateStream(sid, "IQ_ERROR") @@ -300,18 +329,18 @@ buffer = data["file_obj"].read(data["block_size"]) if buffer: - next_iq_elt = client.IQ(data["xmlstream"],'set') + next_iq_elt = jabber_client.IQ(client.xmlstream,'set') next_iq_elt['to'] = data["to"].full() data_elt = next_iq_elt.addElement('data', NS_IBB) data_elt['seq'] = str(seq) data_elt['sid'] = sid data_elt.addContent(base64.b64encode(buffer)) - next_iq_elt.addCallback(self.iqResult, sid, seq+1, length) + next_iq_elt.addCallback(self.iqResult, sid, seq+1, length, profile) next_iq_elt.send() else: - self.terminateStream(sid) + self.terminateStream(sid, profile=profile) - def terminateStream(self, sid, failure_reason = None): + def terminateStream(self, sid, failure_reason = None, profile=None): """Terminate the stream session @param to_jid: recipient @param sid: Session id @@ -320,17 +349,20 @@ @param progress_cb: True if we have to remove the progress callback @param callback: method to call after finishing @param failure_reason: reason of the failure, or None if steam was successful""" - data = self.current_stream[sid] - iq_elt = client.IQ(data["xmlstream"],'set') + client = self.host.getClient(profile) + if not client: + raise ProfileNotInCacheError + data = client.xep_0047_current_stream[sid] + iq_elt = jabber_client.IQ(client.xmlstream,'set') iq_elt['to'] = data["to"].full() close_elt = iq_elt.addElement('close',NS_IBB) close_elt['sid'] = sid iq_elt.send() - self.host.removeProgressCB(sid) + self.host.removeProgressCB(sid, profile) if failure_reason: - self._killId(sid, False, failure_reason) + self._killId(sid, False, failure_reason, profile=profile) else: - self._killId(sid, True) + self._killId(sid, True, profile=profile) class XEP_0047_handler(XMPPHandler): implements(iwokkel.IDisco)
--- a/src/plugins/plugin_xep_0054.py Sun Nov 04 23:53:26 2012 +0100 +++ b/src/plugins/plugin_xep_0054.py Sat Nov 10 16:38:16 2012 +0100 @@ -157,15 +157,15 @@ if answer.firstChildElement().name == "vCard": d = self.vCard2Dict(answer.firstChildElement(), jid.JID(answer["from"]), profile) - d.addCallback(lambda data: self.host.bridge.actionResult("RESULT", answer['id'], data)) + d.addCallback(lambda data: self.host.bridge.actionResult("RESULT", answer['id'], data, profile)) else: error (_("FIXME: vCard not found as first child element")) - self.host.bridge.actionResult("SUPPRESS", answer['id'], {}) #FIXME: maybe an error message would be better + self.host.bridge.actionResult("SUPPRESS", answer['id'], {}, profile) #FIXME: maybe an error message would be better - def vcard_err(self, failure): + def vcard_err(self, failure, profile): """Called when something is wrong with registration""" error (_("Can't find VCard of %s") % failure.value.stanza['from']) - self.host.bridge.actionResult("SUPPRESS", failure.value.stanza['id'], {}) #FIXME: maybe an error message would be better + self.host.bridge.actionResult("SUPPRESS", failure.value.stanza['id'], {}, profile) #FIXME: maybe an error message would be better def getCard(self, target, profile_key='@DEFAULT@'): """Ask server for VCard @@ -182,7 +182,7 @@ reg_request["from"]=current_jid.full() reg_request["to"] = to_jid.userhost() reg_request.addElement('vCard', NS_VCARD) - reg_request.send(to_jid.userhost()).addCallbacks(self.vcard_ok, self.vcard_err, [profile]) + reg_request.send(to_jid.userhost()).addCallbacks(self.vcard_ok, self.vcard_err, callbackArgs=[profile], errbackArgs=[profile]) return reg_request["id"] def getAvatarFile(self, hash):
--- a/src/plugins/plugin_xep_0065.py Sun Nov 04 23:53:26 2012 +0100 +++ b/src/plugins/plugin_xep_0065.py Sat Nov 10 16:38:16 2012 +0100 @@ -58,13 +58,13 @@ from logging import debug, info, warning, error from twisted.internet import protocol, reactor from twisted.internet import error as jab_error -from twisted.words.protocols.jabber import client, jid +from twisted.words.protocols.jabber import jid, client as jabber_client from twisted.protocols.basic import FileSender from twisted.words.xish import domish from twisted.web.client import getPage +from sat.core.exceptions import ProfileNotInCacheError import struct -import urllib -import hashlib, pdb +import hashlib from zope.interface import implements @@ -298,10 +298,10 @@ if self.factory.proxy: self.state = STATE_READY - self.factory.activateCb(self.sid, self.factory.iq_id, self.startTransfer) + self.factory.activateCb(self.sid, self.factory.iq_id, self.startTransfer, self.profile) else: self.state = STATE_TARGET_READY - self.factory.activateCb(self.sid, self.factory.iq_id) + self.factory.activateCb(self.sid, self.factory.iq_id, self.profile) except struct.error, why: return None @@ -311,6 +311,7 @@ if isinstance(self.factory, Socks5ClientFactory): self.sid = self.factory.sid + self.profile = self.factory.profile self.data = self.factory.data self.state = STATE_TARGET_INITIAL self._startNegotiation() @@ -318,13 +319,16 @@ def connectRequested(self, addr, port): debug("connectRequested") - # Check that this session if expected + # Check that this session is expected if not self.factory.hash_sid_map.has_key(addr): #no: we refuse it - self.sendErrorReply(socks5.REPLY_CONN_REFUSED) + self.sendErrorReply(REPLY_CONN_REFUSED) return - self.sid = self.factory.hash_sid_map[addr] - self.factory.current_stream[self.sid]["start_transfer_cb"] = self.startTransfer + self.sid, self.profile = self.factory.hash_sid_map[addr] + client = self.factory.host.getClient(self.profile) + if not client: + raise ProfileNotInCacheError + client.xep_0065_current_stream[self.sid]["start_transfer_cb"] = self.startTransfer self.connectCompleted(addr, 0) self.transport.stopReading() @@ -336,7 +340,7 @@ def fileTransfered(self, d): info(_("File transfer completed, closing connection")) self.transport.loseConnection() - self.factory.finishedCb(self.sid, True) + self.factory.finishedCb(self.sid, True, self.profile) def connectCompleted(self, remotehost, remoteport): debug("connectCompleted") @@ -395,8 +399,8 @@ class Socks5ServerFactory(protocol.ServerFactory): protocol = SOCKSv5 - def __init__(self, current_stream, hash_sid_map, finishedCb): - self.current_stream = current_stream + def __init__(self, host, hash_sid_map, finishedCb): + self.host = host self.hash_sid_map = hash_sid_map self.finishedCb = finishedCb @@ -409,27 +413,30 @@ class Socks5ClientFactory(protocol.ClientFactory): protocol = SOCKSv5 - def __init__(self, current_stream, sid, iq_id, activateCb, finishedCb, proxy=False): + def __init__(self, current_stream, sid, iq_id, activateCb, finishedCb, proxy=False, profile=None): """Init the Client Factory @param current_stream: current streams data @param sid: Session ID @param iq_id: iq id used to initiate the stream @param activateCb: method to call to activate the stream @param finishedCb: method to call when the stream session is finished - @param proxy: True if we are connecting throught a proxy (and we are a requester)""" + @param proxy: True if we are connecting throught a proxy (and we are a requester) + @param profile: %(doc_profile)s""" + assert(profile) self.data = current_stream[sid] self.sid = sid self.iq_id = iq_id self.activateCb = activateCb self.finishedCb = finishedCb self.proxy = proxy + self.profile = profile def startedConnecting(self, connector): debug (_("Socks 5 client connection started")) def clientConnectionLost(self, connector, reason): - debug (_("Socks 5 client connection lost")) - self.finishedCb(self.sid, reason.type == jab_error.ConnectionDone) #TODO: really check if the state is actually successful + debug (_("Socks 5 client connection lost (reason: %s)"), reason) + self.finishedCb(self.sid, reason.type == jab_error.ConnectionDone, self.profile) #TODO: really check if the state is actually successful class XEP_0065(): @@ -458,12 +465,11 @@ info(_("Plugin XEP_0065 initialization")) #session data - self.current_stream = {} #key: stream_id, value: data(dict) - self.hash_sid_map = {} #key: hash of the transfer session, value: session id + self.hash_sid_map = {} #key: hash of the transfer session, value: (session id, profile) self.host = host debug(_("registering")) - self.server_factory = Socks5ServerFactory(self.current_stream, self.hash_sid_map, self._killId) + self.server_factory = Socks5ServerFactory(host, self.hash_sid_map, lambda sid, success, profile: self._killId(sid, success, profile=profile)) #parameters host.memory.importParams(XEP_0065.params) @@ -476,53 +482,69 @@ def getHandler(self, profile): return XEP_0065_handler(self) + def profileConnected(self, profile): + client = self.host.getClient(profile) + if not client: + raise ProfileNotInCacheError + client.xep_0065_current_stream = {} #key: stream_id, value: data(dict) + def getExternalIP(self): """Return IP visible from outside, by asking to a website""" return getPage("http://www.goffi.org/sat_tools/get_ip.php") - def getProgress(self, sid, data): + def getProgress(self, sid, data, profile): """Fill data with position of current transfer""" + client = self.host.getClient(profile) + if not client: + raise ProfileNotInCacheError try: - file_obj = self.current_stream[sid]["file_obj"] + file_obj = client.xep_0065_current_stream[sid]["file_obj"] data["position"] = str(file_obj.tell()) - data["size"] = str(self.current_stream[sid]["size"]) + data["size"] = str(client.xep_0065_current_stream[sid]["size"]) except: pass - def _timeOut(self, sid): + def _timeOut(self, sid, profile): """Delecte current_stream id, called after timeout - @param id: id of self.current_stream""" - info(_("Socks5 Bytestream: TimeOut reached for id %s") % sid); - self._killId(sid, False, "TIMEOUT") + @param id: id of client.xep_0065_current_stream""" + info(_("Socks5 Bytestream: TimeOut reached for id %s [%s]") % (sid, profile)); + self._killId(sid, False, "TIMEOUT", profile) - def _killId(self, sid, success=False, failure_reason="UNKNOWN"): + def _killId(self, sid, success=False, failure_reason="UNKNOWN", profile=None): """Delete an current_stream id, clean up associated observers - @param sid: id of self.current_stream""" - if not self.current_stream.has_key(sid): + @param sid: id of client.xep_0065_current_stream""" + assert(profile) + client = self.host.getClient(profile) + if not client: + warning(_("Client no more in cache")) + return + if not client.xep_0065_current_stream.has_key(sid): warning(_("kill id called on a non existant id")) return - if self.current_stream[sid].has_key("observer_cb"): - xmlstream = self.current_stream[sid]["xmlstream"] - xmlstream.removeObserver(self.current_stream[sid]["event_data"], self.current_stream[sid]["observer_cb"]) - if self.current_stream[sid]['timer'].active(): - self.current_stream[sid]['timer'].cancel() - if self.current_stream[sid].has_key("size"): - self.host.removeProgressCB(sid) + if client.xep_0065_current_stream[sid].has_key("observer_cb"): + xmlstream = client.xep_0065_current_stream[sid]["xmlstream"] + xmlstream.removeObserver(client.xep_0065_current_stream[sid]["event_data"], client.xep_0065_current_stream[sid]["observer_cb"]) + if client.xep_0065_current_stream[sid]['timer'].active(): + client.xep_0065_current_stream[sid]['timer'].cancel() + if client.xep_0065_current_stream[sid].has_key("size"): + self.host.removeProgressCB(sid, profile) - file_obj = self.current_stream[sid]['file_obj'] - success_cb = self.current_stream[sid]['success_cb'] - failure_cb = self.current_stream[sid]['failure_cb'] + file_obj = client.xep_0065_current_stream[sid]['file_obj'] + success_cb = client.xep_0065_current_stream[sid]['success_cb'] + failure_cb = client.xep_0065_current_stream[sid]['failure_cb'] - del self.current_stream[sid] - if self.hash_sid_map.has_key(sid): - del self.hash_sid_map[sid] + session_hash = client.xep_0065_current_stream[sid].get('hash') + del client.xep_0065_current_stream[sid] + if session_hash in self.hash_sid_map: + #FIXME: check that self.hash_sid_map is correctly cleaned in all cases (timeout, normal flow, etc). + del self.hash_sid_map[session_hash] if success: - success_cb(sid, file_obj, NS_BS) + success_cb(sid, file_obj, NS_BS, profile) else: - failure_cb(sid, file_obj, NS_BS, failure_reason) + failure_cb(sid, file_obj, NS_BS, failure_reason, profile) - def startStream(self, file_obj, to_jid, sid, length, successCb, failureCb, size = None, profile='@NONE@'): + def startStream(self, file_obj, to_jid, sid, length, successCb, failureCb, size = None, profile=None): """Launch the stream workflow @param file_obj: file_obj to send @param to_jid: JID of the recipient @@ -531,16 +553,21 @@ @param successCb: method to call when stream successfuly finished @param failureCb: method to call when something goes wrong @param profile: %(doc_profile)s""" + assert(profile) + client = self.host.getClient(profile) + if not client: + error(_("Unknown profile, this should not happen")) + raise ProfileNotInCacheError + if length != None: error(_('stream length not managed yet')) return; - profile_jid, xmlstream = self.host.getJidNStream(profile) - if not profile_jid or not xmlstream: - error(_("Unknown profile, this should not happen")) - return; - data = self.current_stream[sid] = {} - data["profile"] = profile - data["timer"] = reactor.callLater(TIMEOUT, self._timeOut, sid) + + profile_jid = client.jid + xmlstream = client.xmlstream + + data = client.xep_0065_current_stream[sid] = {} + data["timer"] = reactor.callLater(TIMEOUT, self._timeOut, sid, profile) data["file_obj"] = file_obj data["from"] = profile_jid data["to"] = to_jid @@ -548,11 +575,11 @@ data["failure_cb"] = failureCb data["xmlstream"] = xmlstream data["hash"] = calculateHash(profile_jid, to_jid, sid) - self.hash_sid_map[data["hash"]] = sid + self.hash_sid_map[data["hash"]] = (sid, profile) if size: data["size"] = size - self.host.registerProgressCB(sid, self.getProgress) - iq_elt = client.IQ(xmlstream,'set') + self.host.registerProgressCB(sid, self.getProgress, profile) + iq_elt = jabber_client.IQ(xmlstream,'set') iq_elt["from"] = profile_jid.full() iq_elt["to"] = to_jid.full() query_elt = iq_elt.addElement('query', NS_BS) @@ -570,20 +597,21 @@ streamhost['port'] = self.host.memory.getParamA("Proxy port", "File Transfer", profile_key=profile) streamhost['jid'] = self.host.memory.getParamA("Proxy", "File Transfer", profile_key=profile) - iq_elt.addCallback(self.iqResult, sid) + iq_elt.addCallback(self.iqResult, sid, profile) iq_elt.send() - def iqResult(self, sid, iq_elt): + def iqResult(self, sid, profile, iq_elt): """Called when the result of open iq is received""" if iq_elt["type"] == "error": warning(_("Transfer failed")) return - + client = self.host.getClient(profile) + if not client: + raise ProfileNotInCacheError try: - data = self.current_stream[sid] + data = client.xep_0065_current_stream[sid] file_obj = data["file_obj"] timer = data["timer"] - profile = data["profile"] except KeyError: error(_("Internal error, can't do transfer")) return @@ -607,15 +635,17 @@ if proxy_jid != streamhost_jid: warning(_("Proxy jid is not the same as in parameters, this should not happen")) return - factory = Socks5ClientFactory(self.current_stream, sid, None, self.activateProxyStream, self._killId, True) + factory = Socks5ClientFactory(client.xep_0065_current_stream, sid, None, self.activateProxyStream, lambda sid, success, profile: self._killId(sid, success, profile=profile), True, profile) reactor.connectTCP(proxy_host, int(proxy_port), factory) else: data["start_transfer_cb"](file_obj) #We now activate the stream - def activateProxyStream(self, sid, iq_id, start_transfer_cb): + def activateProxyStream(self, sid, iq_id, start_transfer_cb, profile): debug(_("activating stream")) - data = self.current_stream[sid] - profile = data['profile'] + client = self.host.getClient(profile) + if not client: + raise ProfileNotInCacheError + data = client.xep_0065_current_stream[sid] profile_jid, xmlstream = self.host.getJidNStream(profile) iq_elt = client.IQ(xmlstream,'set') @@ -634,22 +664,26 @@ else: start_transfer_cb(file_obj) - def prepareToReceive(self, from_jid, sid, file_obj, size, success_cb, failure_cb): + def prepareToReceive(self, from_jid, sid, file_obj, size, success_cb, failure_cb, profile): """Called when a bytestream is imminent @param from_jid: jid of the sender @param sid: Stream id @param file_obj: File object where data will be written @param size: full size of the data, or None if unknown @param success_cb: method to call when successfuly finished - @param failure_cb: method to call when something goes wrong""" - data = self.current_stream[sid] = {} + @param failure_cb: method to call when something goes wrong + @param profile: %(doc_profile)s""" + client = self.host.getClient(profile) + if not client: + raise ProfileNotInCacheError + data = client.xep_0065_current_stream[sid] = {} data["from"] = from_jid data["file_obj"] = file_obj data["seq"] = -1 if size: data["size"] = size - self.host.registerProgressCB(sid, self.getProgress) - data["timer"] = reactor.callLater(TIMEOUT, self._timeOut, sid) + self.host.registerProgressCB(sid, self.getProgress, profile) + data["timer"] = reactor.callLater(TIMEOUT, self._timeOut, sid, profile) data["success_cb"] = success_cb data["failure_cb"] = failure_cb @@ -657,20 +691,26 @@ def streamQuery(self, iq_elt, profile): """Get file using byte stream""" debug(_("BS stream query")) - profile_jid, xmlstream = self.host.getJidNStream(profile) + client = self.host.getClient(profile) + + if not client: + raise ProfileNotInCacheError + + xmlstream = client.xmlstream + iq_elt.handled = True query_elt = iq_elt.firstChildElement() sid = query_elt.getAttribute("sid") streamhost_elts = filter(lambda elt: elt.name == 'streamhost', query_elt.elements()) - if not sid in self.current_stream: + if not sid in client.xep_0065_current_stream: warning(_("Ignoring unexpected BS transfer: %s" % sid)) self.sendNotAcceptableError(iq_elt['id'], iq_elt['from'], xmlstream) return - self.current_stream[sid]['timer'].cancel() - self.current_stream[sid]["to"] = jid.JID(iq_elt["to"]) - self.current_stream[sid]["xmlstream"] = xmlstream + client.xep_0065_current_stream[sid]['timer'].cancel() + client.xep_0065_current_stream[sid]["to"] = jid.JID(iq_elt["to"]) + client.xep_0065_current_stream[sid]["xmlstream"] = xmlstream if not streamhost_elts: warning(_("No streamhost found in stream query %s" % sid)) @@ -686,16 +726,19 @@ self.sendBadRequestError(iq_elt['id'], iq_elt['from'], xmlstream) return - self.current_stream[sid]["streamhost"] = (sh_host, sh_port, sh_jid) + client.xep_0065_current_stream[sid]["streamhost"] = (sh_host, sh_port, sh_jid) info (_("Stream proposed: host=[%(host)s] port=[%(port)s]") % {'host':sh_host, 'port':sh_port}) - factory = Socks5ClientFactory(self.current_stream, sid, iq_elt["id"], self.activateStream, self._killId) + factory = Socks5ClientFactory(client.xep_0065_current_stream, sid, iq_elt["id"], self.activateStream, lambda sid, success, profile: self._killId(sid, success, profile=profile), profile=profile) reactor.connectTCP(sh_host, int(sh_port), factory) - def activateStream(self, sid, iq_id): + def activateStream(self, sid, iq_id, profile): + client = self.host.getClient(profile) + if not client: + raise ProfileNotInCacheError debug(_("activating stream")) result = domish.Element((None, 'iq')) - data = self.current_stream[sid] + data = client.xep_0065_current_stream[sid] result['type'] = 'result' result['id'] = iq_id result['from'] = data["to"].full() @@ -769,7 +812,7 @@ if not proxy_ent: debug(_("No proxy found on this server")) return - iq_elt = client.IQ(self.parent.xmlstream,'get') + iq_elt = jabber_client.IQ(self.parent.xmlstream,'get') iq_elt["to"] = proxy_ent.full() query_elt = iq_elt.addElement('query', NS_BS) iq_elt.addCallback(self._proxyDataResult)
--- a/src/plugins/plugin_xep_0077.py Sun Nov 04 23:53:26 2012 +0100 +++ b/src/plugins/plugin_xep_0077.py Sat Nov 10 16:38:16 2012 +0100 @@ -54,7 +54,7 @@ """Add a callback which is called when registration to target is successful""" self.triggers[target] = (cb, profile) - def reg_ok(self, answer): + def reg_ok(self, answer, profile): """Called after the first get IQ""" try: x_elem = filter (lambda x:x.name == "x", answer.firstChildElement().elements())[0] #We only want the "x" element (data form) @@ -63,47 +63,47 @@ #TODO: manage registration without data form answer_data={"reason": "unmanaged", "message":_("This gateway can't be managed by SàT, sorry :(")} answer_type = "ERROR" - self.host.bridge.actionResult(answer_type, answer['id'], answer_data) + self.host.bridge.actionResult(answer_type, answer['id'], answer_data, profile) return form = data_form.Form.fromElement(x_elem) xml_data = dataForm2xml(form) - self.host.bridge.actionResult("XMLUI", answer['id'], {"target":answer["from"], "type":"registration", "xml":xml_data}) + self.host.bridge.actionResult("XMLUI", answer['id'], {"target":answer["from"], "type":"registration", "xml":xml_data}, profile) - def reg_err(self, failure): + def reg_err(self, failure, profile): """Called when something is wrong with registration""" info (_("Registration failure: %s") % str(failure.value)) answer_data = {} answer_data['reason'] = 'unknown' answer_data={"message":"%s [code: %s]" % (failure.value.condition, unicode(failure.value))} answer_type = "ERROR" - self.host.bridge.actionResult(answer_type, failure.value.stanza['id'], answer_data) + self.host.bridge.actionResult(answer_type, failure.value.stanza['id'], answer_data, profile) - def unregistrationAnswer(self, answer): + def unregistrationAnswer(self, answer, profile): debug (_("registration answer: %s") % answer.toXml()) answer_type = "SUCCESS" answer_data={"message":_("Your are now unregistred")} - self.host.bridge.actionResult(answer_type, answer['id'], answer_data) + self.host.bridge.actionResult(answer_type, answer['id'], answer_data, profile) - def unregistrationFailure(self, failure): + def unregistrationFailure(self, failure, profile): info (_("Unregistration failure: %s") % str(failure.value)) answer_type = "ERROR" answer_data = {} answer_data['reason'] = 'unknown' answer_data={"message":_("Unregistration failed: %s") % failure.value.condition} - self.host.bridge.actionResult(answer_type, failure.value.stanza['id'], answer_data) + self.host.bridge.actionResult(answer_type, failure.value.stanza['id'], answer_data, profile) - def registrationAnswer(self, answer): + def registrationAnswer(self, answer, profile): debug (_("registration answer: %s") % answer.toXml()) answer_type = "SUCCESS" answer_data={"message":_("Registration successfull")} - self.host.bridge.actionResult(answer_type, answer['id'], answer_data) + self.host.bridge.actionResult(answer_type, answer['id'], answer_data, profile) if self.triggers.has_key(answer["from"]): callback,profile = self.triggers[answer["from"]] callback(answer["from"], profile) del self.triggers[answer["from"]] - def registrationFailure(self, failure): + def registrationFailure(self, failure, profile): info (_("Registration failure: %s") % str(failure.value)) print failure.value.stanza.toXml() answer_type = "ERROR" @@ -114,7 +114,7 @@ else: answer_data['reason'] = 'unknown' answer_data={"message":_("Registration failed")} - self.host.bridge.actionResult(answer_type, failure.value.stanza['id'], answer_data) + self.host.bridge.actionResult(answer_type, failure.value.stanza['id'], answer_data, profile) if self.triggers.has_key(answer["from"]): del self.triggers[answer["from"]] @@ -122,22 +122,22 @@ """Submit a form for registration, using data_form""" id, deferred = self.host.submitForm(action, target, fields, profile) if action == 'CANCEL': - deferred.addCallbacks(self.unregistrationAnswer, self.unregistrationFailure) + deferred.addCallbacks(self.unregistrationAnswer, self.unregistrationFailure, callbackArgs=[profile], errbackArgs=[profile]) else: - deferred.addCallbacks(self.registrationAnswer, self.registrationFailure) + deferred.addCallbacks(self.registrationAnswer, self.registrationFailure, callbackArgs=[profile], errbackArgs=[profile]) return id def in_band_register(self, target, profile_key='@DEFAULT@'): """register to a target JID""" - current_jid, xmlstream = self.host.getJidNStream(profile_key) - if not xmlstream: + client = self.host.getClient(profile_key) + if not client: error (_('Asking for an non-existant or not connected profile')) return "" to_jid = jid.JID(target) debug(_("Asking registration for [%s]") % to_jid.full()) - reg_request=IQ(xmlstream,'get') - reg_request["from"]=current_jid.full() + reg_request=IQ(client.xmlstream,'get') + reg_request["from"]=client.jid.full() reg_request["to"] = to_jid.full() query=reg_request.addElement('query', NS_REG) - reg_request.send(to_jid.full()).addCallbacks(self.reg_ok, self.reg_err) + reg_request.send(to_jid.full()).addCallbacks(self.reg_ok, self.reg_err, callbackArgs=[profile], errbackArgs=[profile]) return reg_request["id"]
--- a/src/plugins/plugin_xep_0096.py Sun Nov 04 23:53:26 2012 +0100 +++ b/src/plugins/plugin_xep_0096.py Sat Nov 10 16:38:16 2012 +0100 @@ -21,16 +21,14 @@ from logging import debug, info, warning, error from twisted.words.xish import domish -from twisted.internet import protocol -from twisted.words.protocols.jabber import client, jid +from twisted.words.protocols.jabber import jid from twisted.words.protocols.jabber import error as jab_error import os, os.path from twisted.internet import reactor -import pdb +from sat.core.exceptions import ProfileNotInCacheError -from zope.interface import implements -from wokkel import disco, iwokkel, data_form +from wokkel import data_form IQ_SET = '/iq[@type="set"]' PROFILE_NAME = "file-transfer" @@ -52,19 +50,23 @@ def __init__(self, host): info(_("Plugin XEP_0096 initialization")) self.host = host - self._waiting_for_approval = {} #key = id, value = [transfer data, IdelayedCall Reactor timeout, - # current stream method, [failed stream methods], profile] - self.managed_stream_m = [self.host.plugins["XEP-0065"].NAMESPACE, + self.managed_stream_m = [#self.host.plugins["XEP-0065"].NAMESPACE, self.host.plugins["XEP-0047"].NAMESPACE] #Stream methods managed self.host.plugins["XEP-0095"].registerSIProfile(PROFILE_NAME, self.transferRequest) host.bridge.addMethod("sendFile", ".plugin", in_sign='ssa{ss}s', out_sign='s', method=self.sendFile) - def _kill_id(self, approval_id): + def profileConnected(self, profile): + client = self.host.getClient(profile) + client._xep_0096_waiting_for_approval = {} #key = id, value = [transfer data, IdelayedCall Reactor timeout, + # current stream method, [failed stream methods], profile] + + def _kill_id(self, approval_id, profile): """Delete a waiting_for_approval id, called after timeout - @param approval_id: id of _waiting_for_approval""" + @param approval_id: id of _xep_0096_waiting_for_approval""" info(_("SI File Transfer: TimeOut reached for id %s") % approval_id); try: - del self._waiting_for_approval[approval_id] + client = self.host.getClient(profile) + del client._xep_0096_waiting_for_approval[approval_id] except KeyError: warning(_("kill id called on a non existant approval id")) @@ -78,6 +80,9 @@ @param profile: %(doc_profile)s""" info (_("XEP-0096 file transfer requested")) debug(si_el.toXml()) + client = self.host.getClient(profile) + if not client: + raise ProfileNotInCacheError filename = "" file_size = "" file_date = None @@ -124,9 +129,9 @@ #if we are here, the transfer can start, we just need user's agreement data={ "filename":filename, "id": iq_id, "from":from_jid, "size":file_size, "date":file_date, "hash":file_hash, "desc":file_desc, "can_range": str(can_range) } - self._waiting_for_approval[si_id] = [data, reactor.callLater(300, self._kill_id, si_id), stream_method, [], profile] + client._xep_0096_waiting_for_approval[si_id] = [data, reactor.callLater(300, self._kill_id, si_id, profile), stream_method, []] - self.host.askConfirmation(si_id, "FILE_TRANSFER", data, self.confirmationCB) + self.host.askConfirmation(si_id, "FILE_TRANSFER", data, self.confirmationCB, profile) def _getFileObject(self, dest_path, can_range = False): @@ -136,12 +141,15 @@ @return: File Object""" return open(dest_path, "ab" if can_range else "wb") - def confirmationCB(self, sid, accepted, frontend_data): + def confirmationCB(self, sid, accepted, frontend_data, profile): """Called on confirmation answer @param sid: file transfer session id @param accepted: True if file transfer is accepted @param frontend_data: data sent by frontend""" - data, timeout, stream_method, failed_methods, profile = self._waiting_for_approval[sid] + client = self.host.getClient(profile) + if not client: + raise ProfileNotInCacheError + data, timeout, stream_method, failed_methods = client._xep_0096_waiting_for_approval[sid] can_range = data['can_range'] == "True" range_offset = 0 if accepted: @@ -151,19 +159,19 @@ dest_path = frontend_data['dest_path'] except KeyError: error(_('dest path not found in frontend_data')) - del(self._waiting_for_approval[sid]) + del(client._xep_0096_waiting_for_approval[sid]) return if stream_method == self.host.plugins["XEP-0065"].NAMESPACE: file_obj = self._getFileObject(dest_path, can_range) range_offset = file_obj.tell() - self.host.plugins["XEP-0065"].prepareToReceive(jid.JID(data['from']), sid, file_obj, int(data["size"]), self._transferSucceeded, self._transferFailed) + self.host.plugins["XEP-0065"].prepareToReceive(jid.JID(data['from']), sid, file_obj, int(data["size"]), self._transferSucceeded, self._transferFailed, profile) elif stream_method == self.host.plugins["XEP-0047"].NAMESPACE: file_obj = self._getFileObject(dest_path, can_range) range_offset = file_obj.tell() - self.host.plugins["XEP-0047"].prepareToReceive(jid.JID(data['from']), sid, file_obj, int(data["size"]), self._transferSucceeded, self._transferFailed) + self.host.plugins["XEP-0047"].prepareToReceive(jid.JID(data['from']), sid, file_obj, int(data["size"]), self._transferSucceeded, self._transferFailed, profile) else: error(_("Unknown stream method, this should not happen at this stage, cancelling transfer")) - del(self._waiting_for_approval[sid]) + del(client._xep_0096_waiting_for_approval[sid]) return #we can send the iq result @@ -179,30 +187,37 @@ else: debug (_("Transfer [%s] refused"), sid) self.host.plugins["XEP-0095"].sendRejectedError (data["id"], data['from'], profile=profile) - del(self._waiting_for_approval[sid]) + del(client._xep_0096_waiting_for_approval[sid]) - def _transferSucceeded(self, sid, file_obj, stream_method): + def _transferSucceeded(self, sid, file_obj, stream_method, profile): """Called by the stream method when transfer successfuly finished @param id: stream id""" + client = self.host.getClient(profile) + if not client: + raise ProfileNotInCacheError file_obj.close() info(_('Transfer %s successfuly finished') % sid) - del(self._waiting_for_approval[sid]) + del(client._xep_0096_waiting_for_approval[sid]) - def _transferFailed(self, sid, file_obj, stream_method, reason): + def _transferFailed(self, sid, file_obj, stream_method, reason, profile): """Called when something went wrong with the transfer @param id: stream id @param reason: can be TIMEOUT, IO_ERROR, PROTOCOL_ERROR""" - data, timeout, stream_method, failed_methods, profile = self._waiting_for_approval[sid] - warning(_('Transfer %(id)s failed with stream method %(s_method)s') % { 'id': sid, - 's_method': stream_method }) + client = self.host.getClient(profile) + if not client: + raise ProfileNotInCacheError + data, timeout, stream_method, failed_methods = client._xep_0096_waiting_for_approval[sid] + warning(_('Transfer %(id)s failed with stream method %(s_method)s: %(reason)s') % { 'id': sid, + 's_method': stream_method, + 'reason': reason}) filepath = file_obj.name file_obj.close() os.remove(filepath) #TODO: session remenber (within a time limit) when a stream method fail, and avoid that stream method with full jid for the rest of the session warning(_("All stream methods failed, can't transfer the file")) - del(self._waiting_for_approval[sid]) + del(client._xep_0096_waiting_for_approval[sid]) - def fileCb(self, profile, filepath, sid, size, IQ): + def fileCb(self, filepath, sid, size, profile, IQ): if IQ['type'] == "error": stanza_err = jab_error.exceptionFromStanza(IQ) if stanza_err.code == '403' and stanza_err.condition == 'forbidden': @@ -252,7 +267,7 @@ else: warning(_("Invalid stream method received")) - def sendFile(self, to_jid, filepath, data={}, profile_key='@DEFAULT@'): + def sendFile(self, to_jid, filepath, data={}, profile_key='@NONE@'): """send a file using XEP-0096 @to_jid: recipient @filepath: absolute path to the file to send @@ -278,13 +293,13 @@ file_transfer_elts.append(domish.Element((None,'range'))) sid, offer = self.host.plugins["XEP-0095"].proposeStream(jid.JID(to_jid), PROFILE, feature_elt, file_transfer_elts, profile_key = profile) - offer.addCallback(self.fileCb, profile, filepath, sid, size) + offer.addCallback(self.fileCb, filepath, sid, size, profile) return sid - def sendSuccessCb(self, sid, file_obj, stream_method): - info(_('Transfer %s successfuly finished') % sid) + def sendSuccessCb(self, sid, file_obj, stream_method, profile): + info(_('Transfer %s successfuly finished [%s]') % (sid, profile)) file_obj.close() - def sendFailureCb(self, sid, file_obj, stream_method, reason): + def sendFailureCb(self, sid, file_obj, stream_method, reason, profile): file_obj.close() - warning(_('Transfer %(id)s failed with stream method %(s_method)s') % { 'id': sid, "s_method": stream_method }) + warning(_('Transfer %(id)s failed with stream method %(s_method)s: %(reason)s [%(profile)s') % { 'id': sid, "s_method": stream_method, 'reason': reason, 'profile': profile })