changeset 425:e4e9187e3b5b

backend, bridge: asynchronous history quick_frontend: use of asynchronous history
author Goffi <goffi@goffi.org>
date Tue, 08 Nov 2011 01:08:11 +0100
parents 72c13313b6d6
children ae446194c20c
files frontends/src/bridge/DBus.py frontends/src/quick_frontend/quick_chat.py src/bridge/DBus.py src/bridge/bridge_constructor/bridge_template.ini src/bridge/bridge_constructor/dbus_core_template.py src/core/sat_main.py src/core/xmpp.py src/test/helpers.py src/tools/memory.py src/tools/sqlite.py
diffstat 10 files changed, 94 insertions(+), 79 deletions(-) [+]
line wrap: on
line diff
--- a/frontends/src/bridge/DBus.py	Mon Nov 07 22:27:07 2011 +0100
+++ b/frontends/src/bridge/DBus.py	Tue Nov 08 01:08:11 2011 +0100
@@ -96,8 +96,8 @@
     def getContacts(self, profile_key="@DEFAULT@"):
         return self.db_core_iface.getContacts(profile_key)
 
-    def getHistory(self, from_jid, to_jid, size):
-        return self.db_core_iface.getHistory(from_jid, to_jid, size)
+    def getHistory(self, from_jid, to_jid, limit, between=True, callback=None, errback=None):
+        return self.db_core_iface.getHistory(from_jid, to_jid, limit, between, reply_handler=callback, error_handler=lambda err:errback(err._dbus_error_name[len(const_ERROR_PREFIX)+1:]))
 
     def getLastResource(self, contact_jid, profile_key="@DEFAULT@"):
         return unicode(self.db_core_iface.getLastResource(contact_jid, profile_key))
--- a/frontends/src/quick_frontend/quick_chat.py	Mon Nov 07 22:27:07 2011 +0100
+++ b/frontends/src/quick_frontend/quick_chat.py	Tue Nov 08 01:08:11 2011 +0100
@@ -87,13 +87,18 @@
     def historyPrint(self, size=20, keep_last=False, profile='@NONE@'):
         """Print the initial history"""
         debug (_("now we print history"))
-        history=self.host.bridge.getHistory(self.host.profiles[profile]['whoami'].short, self.target, 20)
-        stamps=history.keys()
-        stamps.sort()
-        for stamp in stamps: 
-            self.printMessage(JID(history[stamp][0]), history[stamp][1], profile, stamp)
-        if keep_last:  ##FIXME hack for sortilege
-            self.last_history = stamps[-1] if stamps else None
+        def onHistory(history):
+            stamps=history.keys()
+            stamps.sort()
+            for stamp in stamps: 
+                from_jid, to_jid, message = history[stamp]
+                self.printMessage(JID(from_jid), message, profile, stamp)
+            if keep_last:  ##FIXME hack for sortilege
+                self.last_history = stamps[-1] if stamps else None
+        def onHistoryError(err):
+            error (_("Can't get history"))
+
+        history=self.host.bridge.getHistory(self.host.profiles[profile]['whoami'].short, self.target.short, 20, callback=onHistory, errback=onHistoryError)
 
     def _get_nick(self, jid):
         """Return nick of this jid when possible"""
--- a/src/bridge/DBus.py	Mon Nov 07 22:27:07 2011 +0100
+++ b/src/bridge/DBus.py	Tue Nov 08 01:08:11 2011 +0100
@@ -80,13 +80,13 @@
         result = self.cb[name](*args, **kwargs)
         if async:
             if not isinstance(result, Deferred):
-                error("Asynchrone method [%s] does not return a Deferred." % name)
+                error("Asynchronous method [%s] does not return a Deferred." % name)
                 raise AsyncNotDeferred
             result.addCallback(callback)
             result.addErrback(lambda err:errback(GenericException(err)))
         else:
             if isinstance(result, Deferred):
-                error("Synchrone method [%s] return a Deferred." % name)
+                error("Synchronous method [%s] return a Deferred." % name)
                 raise DeferredNotAsync
             return result
 
@@ -252,10 +252,10 @@
         return self._callback("getContacts", unicode(profile_key))
 
     @dbus.service.method(const_INT_PREFIX+const_CORE_SUFFIX,
-                         in_signature='ssi', out_signature='a{i(ss)}',
-                         async_callbacks=None)
-    def getHistory(self, from_jid, to_jid, size):
-        return self._callback("getHistory", unicode(from_jid), unicode(to_jid), size)
+                         in_signature='ssib', out_signature='a{i(sss)}',
+                         async_callbacks=('callback', 'errback'))
+    def getHistory(self, from_jid, to_jid, limit, between=True, callback=None, errback=None):
+        return self._callback("getHistory", unicode(from_jid), unicode(to_jid), limit, between, callback=callback, errback=errback)
 
     @dbus.service.method(const_INT_PREFIX+const_CORE_SUFFIX,
                          in_signature='ss', out_signature='s',
--- a/src/bridge/bridge_constructor/bridge_template.ini	Mon Nov 07 22:27:07 2011 +0100
+++ b/src/bridge/bridge_constructor/bridge_template.ini	Tue Nov 08 01:08:11 2011 +0100
@@ -434,15 +434,18 @@
 doc_return=list of categories
 
 [getHistory]
+async=
 type=method
 category=core
-sig_in=ssi
-sig_out=a{i(ss)}
+sig_in=ssib
+sig_out=a{i(sss)}
+param_3_default=True
 doc=Get history of a communication between two entities
-doc_param_0=from_jid: source JID
-doc_param_1=to_jid: dest JID
-doc_param_2=size: size of the history (0 for the whole history)
-doc_return=Dict where key is timestamp (seconds this the Epoch), and value is a tuple (from_jid, message)
+doc_param_0=from_jid: source JID (bare jid for catch all, full jid else)
+doc_param_1=to_jid: dest JID (bare jid for catch all, full jid else)
+doc_param_2=limit: max number of history elements to get (0 for the whole history)
+doc_param_3=between: True if we want history between the two jids (in both direction), False if we only want messages from from_jid to to_jid
+doc_return=Dict where key is timestamp (seconds this the Epoch), and value is a tuple (full from_jid, full to_jid, message)
 
 [addContact]
 type=method
--- a/src/bridge/bridge_constructor/dbus_core_template.py	Mon Nov 07 22:27:07 2011 +0100
+++ b/src/bridge/bridge_constructor/dbus_core_template.py	Tue Nov 08 01:08:11 2011 +0100
@@ -80,13 +80,13 @@
         result = self.cb[name](*args, **kwargs)
         if async:
             if not isinstance(result, Deferred):
-                error("Asynchrone method [%s] does not return a Deferred." % name)
+                error("Asynchronous method [%s] does not return a Deferred." % name)
                 raise AsyncNotDeferred
             result.addCallback(callback)
             result.addErrback(lambda err:errback(GenericException(err)))
         else:
             if isinstance(result, Deferred):
-                error("Synchrone method [%s] return a Deferred." % name)
+                error("Synchronous method [%s] return a Deferred." % name)
                 raise DeferredNotAsync
             return result
 
--- a/src/core/sat_main.py	Mon Nov 07 22:27:07 2011 +0100
+++ b/src/core/sat_main.py	Tue Nov 08 01:08:11 2011 +0100
@@ -438,7 +438,7 @@
             message.addElement("subject", "jabber:client", subject)
         message.addElement("body", "jabber:client", msg)
         self.profiles[profile].xmlstream.send(message)
-        self.memory.addToHistory(current_jid, current_jid, jid.JID(to), message["type"], unicode(msg))
+        self.memory.addToHistory(current_jid, jid.JID(to), unicode(msg), profile=profile)
         if type!="groupchat":
             self.bridge.newMessage(message['from'], unicode(msg), mess_type=type, to_jid=message['to'], profile=profile) #We send back the message, so all clients are aware of it
 
--- a/src/core/xmpp.py	Mon Nov 07 22:27:07 2011 +0100
+++ b/src/core/xmpp.py	Tue Nov 08 01:08:11 2011 +0100
@@ -110,7 +110,7 @@
         if e.name == "body":
           mess_type = message['type'] if message.hasAttribute('type') else 'normal'
           self.host.bridge.newMessage(message["from"], e.children[0], mess_type, message['to'], profile=self.parent.profile)
-          self.host.memory.addToHistory(self.parent.jid, jid.JID(message["from"]), self.parent.jid, "chat", e.children[0])
+          self.host.memory.addToHistory(jid.JID(message["from"]), jid.JID(message["to"]), e.children[0], profile=self.parent.profile)
           break
     
 class SatRosterProtocol(xmppim.RosterClientProtocol):
--- a/src/test/helpers.py	Mon Nov 07 22:27:07 2011 +0100
+++ b/src/test/helpers.py	Tue Nov 08 01:08:11 2011 +0100
@@ -60,7 +60,7 @@
     def getProfileName(self, profile_key):
         return profile_key
 
-    def addToHistory(self, me_jid, from_jid, to_jid, type, message):
+    def addToHistory(self, from_jid, to_jid, message, timestamp=None, profile=None):
         pass
     
     def addContact(self, contact_jid, attributes, groups, profile_key='@DEFAULT@'):
--- a/src/tools/memory.py	Mon Nov 07 22:27:07 2011 +0100
+++ b/src/tools/memory.py	Tue Nov 08 01:08:11 2011 +0100
@@ -34,7 +34,6 @@
 from sat.tools.sqlite import SqliteStorage
 
 SAVEFILE_PARAM_XML="/param" #xml parameters template
-SAVEFILE_HISTORY="/history"
 SAVEFILE_PRIVATE="/private"  #file used to store misc values (mainly for plugins)
 SAVEFILE_DATABASE="/sat.db"
 
@@ -501,12 +500,10 @@
         self.presenceStatus={}
         self.lastResource={} #tmp, will be refactored with bdd integration
         self.subscriptions={}
-        self.history={}  #used to store chat history (key: short jid)
         self.private={}  #used to store private value
         self.server_features={} #used to store discovery's informations
         self.server_identities={}
         self.config = self.parseMainConf()
-        host.set_const('savefile_history', SAVEFILE_HISTORY)
         host.set_const('savefile_private', SAVEFILE_PRIVATE)
         host.set_const('savefile_database', SAVEFILE_DATABASE)
         database_file = os.path.expanduser(self.getConfig('','local_dir')+
@@ -547,8 +544,6 @@
         """Load parameters and all memory things from file/db"""
         param_file_xml = os.path.expanduser(self.getConfig('','local_dir')+
                                         self.host.get_const('savefile_param_xml'))
-        history_file = os.path.expanduser(self.getConfig('','local_dir')+
-                                        self.host.get_const('savefile_history'))
         private_file = os.path.expanduser(self.getConfig('','local_dir')+
                                         self.host.get_const('savefile_private'))
 
@@ -564,16 +559,6 @@
             info (_("No params template, using default template"))
             self.params.load_default_params()
 
-        
-        #history
-        if os.path.exists(history_file):
-            try:
-                with open(history_file, 'r') as history_pickle:
-                    self.history=pickle.load(history_pickle)
-                debug(_("history loaded"))
-            except:
-                error (_("Can't load history !"))
-
         #private
         if os.path.exists(private_file):
             try:
@@ -604,16 +589,11 @@
         #TODO: need to encrypt files (at least passwords !) and set permissions
         param_file_xml = os.path.expanduser(self.getConfig('','local_dir')+
                                         self.host.get_const('savefile_param_xml'))
-        history_file = os.path.expanduser(self.getConfig('','local_dir')+
-                                        self.host.get_const('savefile_history'))
         private_file = os.path.expanduser(self.getConfig('','local_dir')+
                                         self.host.get_const('savefile_private'))
         
         self.params.save_xml(param_file_xml)
         debug(_("params saved"))
-        with open(history_file, 'w') as history_pickle:
-            pickle.dump(self.history, history_pickle)
-        debug(_("history saved"))
         with open(private_file, 'w') as private_pickle:
             pickle.dump(self.private, private_pickle)
         debug(_("private values saved"))
@@ -645,39 +625,12 @@
         @param name: Name of the profile"""
         return self.params.deleteProfile(name)
 
-    def addToHistory(self, me_jid, from_jid, to_jid, type, message):
-        me_short=me_jid.userhost()
-        from_short=from_jid.userhost()
-        to_short=to_jid.userhost()
-
-        if from_jid==me_jid:
-            key=to_short
-        else:
-            key=from_short
-
-        if not self.history.has_key(me_short):
-            self.history[me_short]={}
-        if not self.history[me_short].has_key(key):
-            self.history[me_short][key]={}
+    def addToHistory(self, from_jid, to_jid, message, timestamp=None, profile="@NONE@"):
+        assert(profile!="@NONE@")
+        return self.storage.addToHistory(from_jid, to_jid, message, timestamp, profile)
 
-        self.history[me_short][key][int(time.time())] = (from_jid.full(), message)
-        
-    def getHistory(self, from_jid, to_jid, size):
-        ret={}
-        if not self.history.has_key(from_jid):
-            error(_("source JID not found !"))
-            #TODO: throw an error here
-            return {}
-        if not self.history[from_jid].has_key(to_jid):
-            error(_("dest JID not found !"))
-            #TODO: throw an error here
-            return {}
-        stamps=self.history[from_jid][to_jid].keys()
-        stamps.sort()
-        for stamp in stamps[-size:]:
-            ret[stamp]=self.history[from_jid][to_jid][stamp]
-
-        return ret
+    def getHistory(self, from_jid, to_jid, limit=0, between=True):
+        return self.storage.getHistory(jid.JID(from_jid), jid.JID(to_jid), limit, between)
 
     def setPrivate(self, key, value):
         """Save a misc private value (mainly useful for plugins)"""
--- a/src/tools/sqlite.py	Mon Nov 07 22:27:07 2011 +0100
+++ b/src/tools/sqlite.py	Tue Nov 08 01:08:11 2011 +0100
@@ -24,6 +24,7 @@
 from twisted.enterprise import adbapi
 from twisted.internet import defer
 import os.path
+import time
 
 class SqliteStorage():
     """This class manage storage with Sqlite database"""
@@ -44,7 +45,7 @@
             info(_("The database is new, creating the tables"))
             database_creation = [
             "CREATE TABLE profiles (id INTEGER PRIMARY KEY ASC, name TEXT, UNIQUE (name))",
-            "CREATE TABLE historic (id INTEGER PRIMARY KEY ASC, profile_id INTEGER, source TEXT, dest TEXT, source_res TEXT, dest_res TEXT, timestamp DATETIME, message TEXT, FOREIGN KEY(profile_id) REFERENCES profiles(id))",
+            "CREATE TABLE history (id INTEGER PRIMARY KEY ASC, profile_id INTEGER, source TEXT, dest TEXT, source_res TEXT, dest_res TEXT, timestamp DATETIME, message TEXT, FOREIGN KEY(profile_id) REFERENCES profiles(id))",
             "CREATE TABLE param_gen (category TEXT, name TEXT, value TEXT, PRIMARY KEY (category,name))",
             "CREATE TABLE param_ind (category TEXT, name TEXT, profile_id INTEGER, value TEXT, PRIMARY KEY (category,name,profile_id), FOREIGN KEY(profile_id) REFERENCES profiles(id))"]
             for op in database_creation:
@@ -162,6 +163,59 @@
         d.addErrback(lambda ignore: error(_("Can't set individual parameter (%(category)s/%(name)s) for [%(profile)s] in database" % {"category":category, "name":name, "profile":profile})))
         return d
 
+    #History
+    def addToHistory(self, from_jid, to_jid, message, timestamp=None, profile=None):
+        """Store a new message in history
+        @param from_jid: full source JID
+        @param to_jid: full dest JID
+        @param message: message
+        @param timestamp: timestamp in seconds since epoch, or None to use current time
+        """
+        assert(profile!=None)
+        d = self.dbpool.runQuery("INSERT INTO history(source, source_res, dest, dest_res, timestamp, message, profile_id) VALUES (?,?,?,?,?,?,?)",
+                                (from_jid.userhost(), from_jid.resource, to_jid.userhost(), to_jid.resource, timestamp or int(time.time()),
+                                message, self.profiles[profile]))
+        d.addErrback(lambda ignore: error(_("Can't save following message in history: from [%(from_jid)s] to [%(to_jid)s] ==> [%(message)s]" %
+                                         {"from_jid":from_jid.full(), "to_jid":to_jid.full(), "message":message})))
+        return d
+
+    def getHistory(self, from_jid, to_jid, limit=0, between=True):
+        """Store a new message in history
+        @param from_jid: source JID (full, or bare for catchall
+        @param to_jid: dest JID (full, or bare for catchall
+        @param size: maximum number of messages to get, or 0 for unlimited
+        """
+        def sqliteToDict(result):
+            result_dict = {}
+            for row in result:
+                timestamp, source, source_res, dest, dest_res, message = row
+                result_dict[timestamp] = ("%s/%s" % (source, source_res) if source_res else source,
+                                          "%s/%s" % (dest, dest_res) if dest_res else dest,
+                                          message)
+            return result_dict
+        
+        query_parts = ["SELECT timestamp, source, source_res, dest, dest_res, message FROM history WHERE"]
+        values = []
+
+        if between:
+            query_parts.append("(source=? OR source=?) AND (dest=? or dest=?)")
+            values.extend([from_jid.userhost(), to_jid.userhost(), to_jid.userhost(), from_jid.userhost()])
+        else:
+            query_parts.append("source=? AND dest=?")
+            values.extend([from_jid.userhost(), to_jid.userhost()])
+        if from_jid.resource:
+            query_parts.append("AND source_res=?")
+            values.append(from_jid.resource)
+        if to_jid.resource:
+            query_parts.append("AND dest_res=?")
+            values.append(to_jid.resource)
+        if limit:
+            query_parts.append("LIMIT ?")
+            values.append(limit)
+        d = self.dbpool.runQuery(" ".join(query_parts), values)
+        return d.addCallback(sqliteToDict)
+
+
     ##Helper methods##
 
     def __getFirstResult(self, result):