Mercurial > libervia-backend
changeset 425:e4e9187e3b5b
backend, bridge: asynchronous history
quick_frontend: use of asynchronous history
author | Goffi <goffi@goffi.org> |
---|---|
date | Tue, 08 Nov 2011 01:08:11 +0100 (2011-11-08) |
parents | 72c13313b6d6 |
children | ae446194c20c |
files | frontends/src/bridge/DBus.py frontends/src/quick_frontend/quick_chat.py src/bridge/DBus.py src/bridge/bridge_constructor/bridge_template.ini src/bridge/bridge_constructor/dbus_core_template.py src/core/sat_main.py src/core/xmpp.py src/test/helpers.py src/tools/memory.py src/tools/sqlite.py |
diffstat | 10 files changed, 94 insertions(+), 79 deletions(-) [+] |
line wrap: on
line diff
--- a/frontends/src/bridge/DBus.py Mon Nov 07 22:27:07 2011 +0100 +++ b/frontends/src/bridge/DBus.py Tue Nov 08 01:08:11 2011 +0100 @@ -96,8 +96,8 @@ def getContacts(self, profile_key="@DEFAULT@"): return self.db_core_iface.getContacts(profile_key) - def getHistory(self, from_jid, to_jid, size): - return self.db_core_iface.getHistory(from_jid, to_jid, size) + def getHistory(self, from_jid, to_jid, limit, between=True, callback=None, errback=None): + return self.db_core_iface.getHistory(from_jid, to_jid, limit, between, reply_handler=callback, error_handler=lambda err:errback(err._dbus_error_name[len(const_ERROR_PREFIX)+1:])) def getLastResource(self, contact_jid, profile_key="@DEFAULT@"): return unicode(self.db_core_iface.getLastResource(contact_jid, profile_key))
--- a/frontends/src/quick_frontend/quick_chat.py Mon Nov 07 22:27:07 2011 +0100 +++ b/frontends/src/quick_frontend/quick_chat.py Tue Nov 08 01:08:11 2011 +0100 @@ -87,13 +87,18 @@ def historyPrint(self, size=20, keep_last=False, profile='@NONE@'): """Print the initial history""" debug (_("now we print history")) - history=self.host.bridge.getHistory(self.host.profiles[profile]['whoami'].short, self.target, 20) - stamps=history.keys() - stamps.sort() - for stamp in stamps: - self.printMessage(JID(history[stamp][0]), history[stamp][1], profile, stamp) - if keep_last: ##FIXME hack for sortilege - self.last_history = stamps[-1] if stamps else None + def onHistory(history): + stamps=history.keys() + stamps.sort() + for stamp in stamps: + from_jid, to_jid, message = history[stamp] + self.printMessage(JID(from_jid), message, profile, stamp) + if keep_last: ##FIXME hack for sortilege + self.last_history = stamps[-1] if stamps else None + def onHistoryError(err): + error (_("Can't get history")) + + history=self.host.bridge.getHistory(self.host.profiles[profile]['whoami'].short, self.target.short, 20, callback=onHistory, errback=onHistoryError) def _get_nick(self, jid): """Return nick of this jid when possible"""
--- a/src/bridge/DBus.py Mon Nov 07 22:27:07 2011 +0100 +++ b/src/bridge/DBus.py Tue Nov 08 01:08:11 2011 +0100 @@ -80,13 +80,13 @@ result = self.cb[name](*args, **kwargs) if async: if not isinstance(result, Deferred): - error("Asynchrone method [%s] does not return a Deferred." % name) + error("Asynchronous method [%s] does not return a Deferred." % name) raise AsyncNotDeferred result.addCallback(callback) result.addErrback(lambda err:errback(GenericException(err))) else: if isinstance(result, Deferred): - error("Synchrone method [%s] return a Deferred." % name) + error("Synchronous method [%s] return a Deferred." % name) raise DeferredNotAsync return result @@ -252,10 +252,10 @@ return self._callback("getContacts", unicode(profile_key)) @dbus.service.method(const_INT_PREFIX+const_CORE_SUFFIX, - in_signature='ssi', out_signature='a{i(ss)}', - async_callbacks=None) - def getHistory(self, from_jid, to_jid, size): - return self._callback("getHistory", unicode(from_jid), unicode(to_jid), size) + in_signature='ssib', out_signature='a{i(sss)}', + async_callbacks=('callback', 'errback')) + def getHistory(self, from_jid, to_jid, limit, between=True, callback=None, errback=None): + return self._callback("getHistory", unicode(from_jid), unicode(to_jid), limit, between, callback=callback, errback=errback) @dbus.service.method(const_INT_PREFIX+const_CORE_SUFFIX, in_signature='ss', out_signature='s',
--- a/src/bridge/bridge_constructor/bridge_template.ini Mon Nov 07 22:27:07 2011 +0100 +++ b/src/bridge/bridge_constructor/bridge_template.ini Tue Nov 08 01:08:11 2011 +0100 @@ -434,15 +434,18 @@ doc_return=list of categories [getHistory] +async= type=method category=core -sig_in=ssi -sig_out=a{i(ss)} +sig_in=ssib +sig_out=a{i(sss)} +param_3_default=True doc=Get history of a communication between two entities -doc_param_0=from_jid: source JID -doc_param_1=to_jid: dest JID -doc_param_2=size: size of the history (0 for the whole history) -doc_return=Dict where key is timestamp (seconds this the Epoch), and value is a tuple (from_jid, message) +doc_param_0=from_jid: source JID (bare jid for catch all, full jid else) +doc_param_1=to_jid: dest JID (bare jid for catch all, full jid else) +doc_param_2=limit: max number of history elements to get (0 for the whole history) +doc_param_3=between: True if we want history between the two jids (in both direction), False if we only want messages from from_jid to to_jid +doc_return=Dict where key is timestamp (seconds this the Epoch), and value is a tuple (full from_jid, full to_jid, message) [addContact] type=method
--- a/src/bridge/bridge_constructor/dbus_core_template.py Mon Nov 07 22:27:07 2011 +0100 +++ b/src/bridge/bridge_constructor/dbus_core_template.py Tue Nov 08 01:08:11 2011 +0100 @@ -80,13 +80,13 @@ result = self.cb[name](*args, **kwargs) if async: if not isinstance(result, Deferred): - error("Asynchrone method [%s] does not return a Deferred." % name) + error("Asynchronous method [%s] does not return a Deferred." % name) raise AsyncNotDeferred result.addCallback(callback) result.addErrback(lambda err:errback(GenericException(err))) else: if isinstance(result, Deferred): - error("Synchrone method [%s] return a Deferred." % name) + error("Synchronous method [%s] return a Deferred." % name) raise DeferredNotAsync return result
--- a/src/core/sat_main.py Mon Nov 07 22:27:07 2011 +0100 +++ b/src/core/sat_main.py Tue Nov 08 01:08:11 2011 +0100 @@ -438,7 +438,7 @@ message.addElement("subject", "jabber:client", subject) message.addElement("body", "jabber:client", msg) self.profiles[profile].xmlstream.send(message) - self.memory.addToHistory(current_jid, current_jid, jid.JID(to), message["type"], unicode(msg)) + self.memory.addToHistory(current_jid, jid.JID(to), unicode(msg), profile=profile) if type!="groupchat": self.bridge.newMessage(message['from'], unicode(msg), mess_type=type, to_jid=message['to'], profile=profile) #We send back the message, so all clients are aware of it
--- a/src/core/xmpp.py Mon Nov 07 22:27:07 2011 +0100 +++ b/src/core/xmpp.py Tue Nov 08 01:08:11 2011 +0100 @@ -110,7 +110,7 @@ if e.name == "body": mess_type = message['type'] if message.hasAttribute('type') else 'normal' self.host.bridge.newMessage(message["from"], e.children[0], mess_type, message['to'], profile=self.parent.profile) - self.host.memory.addToHistory(self.parent.jid, jid.JID(message["from"]), self.parent.jid, "chat", e.children[0]) + self.host.memory.addToHistory(jid.JID(message["from"]), jid.JID(message["to"]), e.children[0], profile=self.parent.profile) break class SatRosterProtocol(xmppim.RosterClientProtocol):
--- a/src/test/helpers.py Mon Nov 07 22:27:07 2011 +0100 +++ b/src/test/helpers.py Tue Nov 08 01:08:11 2011 +0100 @@ -60,7 +60,7 @@ def getProfileName(self, profile_key): return profile_key - def addToHistory(self, me_jid, from_jid, to_jid, type, message): + def addToHistory(self, from_jid, to_jid, message, timestamp=None, profile=None): pass def addContact(self, contact_jid, attributes, groups, profile_key='@DEFAULT@'):
--- a/src/tools/memory.py Mon Nov 07 22:27:07 2011 +0100 +++ b/src/tools/memory.py Tue Nov 08 01:08:11 2011 +0100 @@ -34,7 +34,6 @@ from sat.tools.sqlite import SqliteStorage SAVEFILE_PARAM_XML="/param" #xml parameters template -SAVEFILE_HISTORY="/history" SAVEFILE_PRIVATE="/private" #file used to store misc values (mainly for plugins) SAVEFILE_DATABASE="/sat.db" @@ -501,12 +500,10 @@ self.presenceStatus={} self.lastResource={} #tmp, will be refactored with bdd integration self.subscriptions={} - self.history={} #used to store chat history (key: short jid) self.private={} #used to store private value self.server_features={} #used to store discovery's informations self.server_identities={} self.config = self.parseMainConf() - host.set_const('savefile_history', SAVEFILE_HISTORY) host.set_const('savefile_private', SAVEFILE_PRIVATE) host.set_const('savefile_database', SAVEFILE_DATABASE) database_file = os.path.expanduser(self.getConfig('','local_dir')+ @@ -547,8 +544,6 @@ """Load parameters and all memory things from file/db""" param_file_xml = os.path.expanduser(self.getConfig('','local_dir')+ self.host.get_const('savefile_param_xml')) - history_file = os.path.expanduser(self.getConfig('','local_dir')+ - self.host.get_const('savefile_history')) private_file = os.path.expanduser(self.getConfig('','local_dir')+ self.host.get_const('savefile_private')) @@ -564,16 +559,6 @@ info (_("No params template, using default template")) self.params.load_default_params() - - #history - if os.path.exists(history_file): - try: - with open(history_file, 'r') as history_pickle: - self.history=pickle.load(history_pickle) - debug(_("history loaded")) - except: - error (_("Can't load history !")) - #private if os.path.exists(private_file): try: @@ -604,16 +589,11 @@ #TODO: need to encrypt files (at least passwords !) and set permissions param_file_xml = os.path.expanduser(self.getConfig('','local_dir')+ self.host.get_const('savefile_param_xml')) - history_file = os.path.expanduser(self.getConfig('','local_dir')+ - self.host.get_const('savefile_history')) private_file = os.path.expanduser(self.getConfig('','local_dir')+ self.host.get_const('savefile_private')) self.params.save_xml(param_file_xml) debug(_("params saved")) - with open(history_file, 'w') as history_pickle: - pickle.dump(self.history, history_pickle) - debug(_("history saved")) with open(private_file, 'w') as private_pickle: pickle.dump(self.private, private_pickle) debug(_("private values saved")) @@ -645,39 +625,12 @@ @param name: Name of the profile""" return self.params.deleteProfile(name) - def addToHistory(self, me_jid, from_jid, to_jid, type, message): - me_short=me_jid.userhost() - from_short=from_jid.userhost() - to_short=to_jid.userhost() - - if from_jid==me_jid: - key=to_short - else: - key=from_short - - if not self.history.has_key(me_short): - self.history[me_short]={} - if not self.history[me_short].has_key(key): - self.history[me_short][key]={} + def addToHistory(self, from_jid, to_jid, message, timestamp=None, profile="@NONE@"): + assert(profile!="@NONE@") + return self.storage.addToHistory(from_jid, to_jid, message, timestamp, profile) - self.history[me_short][key][int(time.time())] = (from_jid.full(), message) - - def getHistory(self, from_jid, to_jid, size): - ret={} - if not self.history.has_key(from_jid): - error(_("source JID not found !")) - #TODO: throw an error here - return {} - if not self.history[from_jid].has_key(to_jid): - error(_("dest JID not found !")) - #TODO: throw an error here - return {} - stamps=self.history[from_jid][to_jid].keys() - stamps.sort() - for stamp in stamps[-size:]: - ret[stamp]=self.history[from_jid][to_jid][stamp] - - return ret + def getHistory(self, from_jid, to_jid, limit=0, between=True): + return self.storage.getHistory(jid.JID(from_jid), jid.JID(to_jid), limit, between) def setPrivate(self, key, value): """Save a misc private value (mainly useful for plugins)"""
--- a/src/tools/sqlite.py Mon Nov 07 22:27:07 2011 +0100 +++ b/src/tools/sqlite.py Tue Nov 08 01:08:11 2011 +0100 @@ -24,6 +24,7 @@ from twisted.enterprise import adbapi from twisted.internet import defer import os.path +import time class SqliteStorage(): """This class manage storage with Sqlite database""" @@ -44,7 +45,7 @@ info(_("The database is new, creating the tables")) database_creation = [ "CREATE TABLE profiles (id INTEGER PRIMARY KEY ASC, name TEXT, UNIQUE (name))", - "CREATE TABLE historic (id INTEGER PRIMARY KEY ASC, profile_id INTEGER, source TEXT, dest TEXT, source_res TEXT, dest_res TEXT, timestamp DATETIME, message TEXT, FOREIGN KEY(profile_id) REFERENCES profiles(id))", + "CREATE TABLE history (id INTEGER PRIMARY KEY ASC, profile_id INTEGER, source TEXT, dest TEXT, source_res TEXT, dest_res TEXT, timestamp DATETIME, message TEXT, FOREIGN KEY(profile_id) REFERENCES profiles(id))", "CREATE TABLE param_gen (category TEXT, name TEXT, value TEXT, PRIMARY KEY (category,name))", "CREATE TABLE param_ind (category TEXT, name TEXT, profile_id INTEGER, value TEXT, PRIMARY KEY (category,name,profile_id), FOREIGN KEY(profile_id) REFERENCES profiles(id))"] for op in database_creation: @@ -162,6 +163,59 @@ d.addErrback(lambda ignore: error(_("Can't set individual parameter (%(category)s/%(name)s) for [%(profile)s] in database" % {"category":category, "name":name, "profile":profile}))) return d + #History + def addToHistory(self, from_jid, to_jid, message, timestamp=None, profile=None): + """Store a new message in history + @param from_jid: full source JID + @param to_jid: full dest JID + @param message: message + @param timestamp: timestamp in seconds since epoch, or None to use current time + """ + assert(profile!=None) + d = self.dbpool.runQuery("INSERT INTO history(source, source_res, dest, dest_res, timestamp, message, profile_id) VALUES (?,?,?,?,?,?,?)", + (from_jid.userhost(), from_jid.resource, to_jid.userhost(), to_jid.resource, timestamp or int(time.time()), + message, self.profiles[profile])) + d.addErrback(lambda ignore: error(_("Can't save following message in history: from [%(from_jid)s] to [%(to_jid)s] ==> [%(message)s]" % + {"from_jid":from_jid.full(), "to_jid":to_jid.full(), "message":message}))) + return d + + def getHistory(self, from_jid, to_jid, limit=0, between=True): + """Store a new message in history + @param from_jid: source JID (full, or bare for catchall + @param to_jid: dest JID (full, or bare for catchall + @param size: maximum number of messages to get, or 0 for unlimited + """ + def sqliteToDict(result): + result_dict = {} + for row in result: + timestamp, source, source_res, dest, dest_res, message = row + result_dict[timestamp] = ("%s/%s" % (source, source_res) if source_res else source, + "%s/%s" % (dest, dest_res) if dest_res else dest, + message) + return result_dict + + query_parts = ["SELECT timestamp, source, source_res, dest, dest_res, message FROM history WHERE"] + values = [] + + if between: + query_parts.append("(source=? OR source=?) AND (dest=? or dest=?)") + values.extend([from_jid.userhost(), to_jid.userhost(), to_jid.userhost(), from_jid.userhost()]) + else: + query_parts.append("source=? AND dest=?") + values.extend([from_jid.userhost(), to_jid.userhost()]) + if from_jid.resource: + query_parts.append("AND source_res=?") + values.append(from_jid.resource) + if to_jid.resource: + query_parts.append("AND dest_res=?") + values.append(to_jid.resource) + if limit: + query_parts.append("LIMIT ?") + values.append(limit) + d = self.dbpool.runQuery(" ".join(query_parts), values) + return d.addCallback(sqliteToDict) + + ##Helper methods## def __getFirstResult(self, result):