diff sat/memory/sqlite.py @ 2699:310e41bd6666

core (memory/sqlite): added stanza_id: /!\ database schema change /!\ stanza_id is a new field in history added to prepare the implementation of MAM for messages. A new "last_stanza_id" can be used in filters to retrieve last message with a know stanza id (useful for history synchronisation).
author Goffi <goffi@goffi.org>
date Sat, 01 Dec 2018 10:08:17 +0100
parents 26edcf3a30eb
children 9adf44996e58
line wrap: on
line diff
--- a/sat/memory/sqlite.py	Sat Dec 01 10:04:17 2018 +0100
+++ b/sat/memory/sqlite.py	Sat Dec 01 10:08:17 2018 +0100
@@ -36,7 +36,7 @@
 import sqlite3
 import json
 
-CURRENT_DB_VERSION = 5
+CURRENT_DB_VERSION = 6
 
 # XXX: DATABASE schemas are used in the following way:
 #      - 'current' key is for the actual database schema, for a new base
@@ -56,7 +56,7 @@
                                                    ("FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE",))),
                               ('message_types',   (("type TEXT PRIMARY KEY",),
                                   ())),
-                              ('history',         (("uid TEXT PRIMARY KEY", "update_uid TEXT", "profile_id INTEGER", "source TEXT", "dest TEXT", "source_res TEXT", "dest_res TEXT",
+                              ('history',         (("uid TEXT PRIMARY KEY", "stanza_id TEXT", "update_uid TEXT", "profile_id INTEGER", "source TEXT", "dest TEXT", "source_res TEXT", "dest_res TEXT",
                                                     "timestamp DATETIME NOT NULL", "received_timestamp DATETIME", # XXX: timestamp is the time when the message was emitted. If received time stamp is not NULL, the message was delayed and timestamp is the declared value (and received_timestamp the time of reception)
                                                     "type TEXT", "extra BLOB"),
                                                    ("FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE", "FOREIGN KEY(type) REFERENCES message_types(type)",
@@ -98,6 +98,8 @@
                                                 )),
                               )),
                     },
+        6:         {'cols create': {'history': ('stanza_id TEXT',)},
+                   },
         5:         {'create': {'files': (("id TEXT NOT NULL", "version TEXT NOT NULL", "parent TEXT NOT NULL",
                                           "type TEXT CHECK(type in ('{file}', '{directory}')) NOT NULL DEFAULT '{file}'".format(
                                               file=C.FILE_TYPE_FILE, directory=C.FILE_TYPE_DIRECTORY),
@@ -117,7 +119,7 @@
                    },
         }
 
-NOT_IN_EXTRA = ('received_timestamp', 'update_uid') # keys which are in message data extra but not stored in sqlite's extra field
+NOT_IN_EXTRA = ('stanza_id', 'received_timestamp', 'update_uid') # keys which are in message data extra but not stored in sqlite's extra field
                                                     # this is specific to this sqlite storage and for now only used for received_timestamp
                                                     # because this value is stored in a separate field
 
@@ -423,8 +425,8 @@
         extra = pickle.dumps({k: v for k, v in data['extra'].iteritems() if k not in NOT_IN_EXTRA}, 0)
         from_jid = data['from']
         to_jid = data['to']
-        d = self.dbpool.runQuery("INSERT INTO history(uid, update_uid, profile_id, source, dest, source_res, dest_res, timestamp, received_timestamp, type, extra) VALUES (?,?,?,?,?,?,?,?,?,?,?)",
-                                 (data['uid'], data['extra'].get('update_uid'), self.profiles[profile], data['from'].userhost(), to_jid.userhost(), from_jid.resource, to_jid.resource, data['timestamp'], data.get('received_timestamp'), data['type'], sqlite3.Binary(extra)))
+        d = self.dbpool.runQuery("INSERT INTO history(uid, stanza_id, update_uid, profile_id, source, dest, source_res, dest_res, timestamp, received_timestamp, type, extra) VALUES (?,?,?,?,?,?,?,?,?,?,?,?)",
+                                 (data['uid'], data['extra'].get('stanza_id'), data['extra'].get('update_uid'), self.profiles[profile], data['from'].userhost(), to_jid.userhost(), from_jid.resource, to_jid.resource, data['timestamp'], data.get('received_timestamp'), data['type'], sqlite3.Binary(extra)))
         d.addCallbacks(self._addToHistoryCb, self._addToHistoryEb, callbackArgs=[data], errbackArgs=[data])
         d.addErrback(self._logHistoryError, from_jid, to_jid, data)
         return d
@@ -434,7 +436,7 @@
         result = []
         current = {'uid': None}
         for row in reversed(query_result):
-            uid, update_uid, source, dest, source_res, dest_res, timestamp, received_timestamp,\
+            uid, stanza_id, update_uid, source, dest, source_res, dest_res, timestamp, received_timestamp,\
             type_, extra, message, message_lang, subject, subject_lang, thread, thread_parent = row
             if uid != current['uid']:
                 # new message
@@ -452,6 +454,8 @@
                     'extra': extra,
                     'timestamp': timestamp,
                     }
+                if stanza_id is not None:
+                    current['extra']['stanza_id'] = stanza_id
                 if update_uid is not None:
                     current['extra']['update_uid'] = update_uid
                 if received_timestamp is not None:
@@ -492,7 +496,7 @@
             - 0 for no message (returns the empty list)
             - None for unlimited
         @param between (bool): confound source and dest (ignore the direction)
-        @param search (unicode): pattern to filter the history results
+        @param filters (dict[unicode, unicode]): pattern to filter the history results
         @param profile (unicode): %(doc_profile)s
         @return: list of tuple as in [messageNew]
         """
@@ -502,29 +506,54 @@
         if limit == 0:
             return defer.succeed([])
 
-        query_parts = [u"SELECT uid, update_uid, source, dest, source_res, dest_res, timestamp, received_timestamp,\
+        query_parts = [u"SELECT uid, stanza_id, update_uid, source, dest, source_res, dest_res, timestamp, received_timestamp,\
                         type, extra, message, message.language, subject, subject.language, thread_id, thread.parent_id\
                         FROM history LEFT JOIN message ON history.uid = message.history_uid\
                         LEFT JOIN subject ON history.uid=subject.history_uid\
                         LEFT JOIN thread ON history.uid=thread.history_uid\
-                        WHERE profile_id=? AND"] # FIXME: not sure if it's the best request, messages and subjects can appear several times here
+                        WHERE profile_id=?"] # FIXME: not sure if it's the best request, messages and subjects can appear several times here
         values = [self.profiles[profile]]
 
-        def test_jid(type_, _jid):
-            values.append(_jid.userhost())
-            if _jid.resource:
-                values.append(_jid.resource)
-                return u'(%s=? AND %s_res=?)' % (type_, type_)
-            return u'%s=?' % (type_, )
+        def test_jid(type_, jid_):
+            values.append(jid_.userhost())
+            if jid_.resource:
+                values.append(jid_.resource)
+                return u'({type_}=? AND {type_}_res=?)'.format(type_=type_)
+            return u'{type_}=?'.format(type_=type_)
 
-        if between:
-            query_parts.append(u"((%s AND %s) OR (%s AND %s))" % (test_jid('source', from_jid),
-                                                             test_jid('dest', to_jid),
-                                                             test_jid('source', to_jid),
-                                                             test_jid('dest', from_jid)))
+        if not from_jid and not to_jid:
+            # not jid specified, we want all one2one communications
+            pass
+        elif between:
+            if not from_jid or not to_jid:
+                # we only have one jid specified, we check all messages
+                # from or to this jid
+                jid_ = from_jid or to_jid
+                query_parts.append(u"AND ({source} OR {dest})".format(
+                    source=test_jid(u'source', jid_),
+                    dest=test_jid(u'dest' , jid_)))
+            else:
+                # we have 2 jids specified, we check all communications between
+                # those 2 jids
+                query_parts.append(
+                    u"AND (({source_from} AND {dest_to}) "
+                    u"OR ({source_to} AND {dest_from}))".format(
+                    source_from=test_jid('source', from_jid),
+                    dest_to=test_jid('dest', to_jid),
+                    source_to=test_jid('source', to_jid),
+                    dest_from=test_jid('dest', from_jid)))
         else:
-            query_parts.append(u"%s AND %s" % (test_jid('source', from_jid),
-                                              test_jid('dest', to_jid)))
+            # we want one communication in specific direction (from somebody or
+            # to somebody).
+            q = []
+            if from_jid is not None:
+                q.append(test_jid('source', from_jid))
+            if to_jid is not None:
+                q.append(test_jid('dest', to_jid))
+            query_parts.append(u"AND " + u" AND ".join(q))
+
+        # set to True if "ORDER BY" is already added
+        order = False
 
         if filters:
             if 'body' in filters:
@@ -542,10 +571,20 @@
                 types = filters['not_types'].split()
                 query_parts.append(u"AND type NOT IN ({})".format(u','.join("?"*len(types))))
                 values.extend(types)
-
+            if 'last_stanza_id' in filters:
+                # this request get the last message with a "stanza_id" that we
+                # have in history. This is mainly used to retrieve messages sent
+                # while we were offline, using MAM (XEP-0313).
+                if (filters[u'last_stanza_id'] is not True
+                    or from_jid is not None or to_jid is not None
+                    or limit != 1):
+                    raise ValueError(u"Unexpected values for last_stanza_id filter")
+                query_parts.append(u"AND stanza_id IS NOT NULL ORDER BY history.rowid DESC")
+                order = True
 
-        query_parts.append(u"ORDER BY timestamp DESC") # we reverse the order in sqliteHistoryToList
-                                                      # we use DESC here so LIMIT keep the last messages
+        if not order:
+            query_parts.append(u"ORDER BY timestamp DESC") # we reverse the order in sqliteHistoryToList
+                                                           # we use DESC here so LIMIT keep the last messages
         if limit is not None:
             query_parts.append(u"LIMIT ?")
             values.append(limit)